You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by al...@apache.org on 2022/03/05 07:34:59 UTC
[incubator-inlong] branch master updated: support keeping message order (#2921)
This is an automated email from the ASF dual-hosted git repository.
aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 99fb55a support keeping message order (#2921)
99fb55a is described below
commit 99fb55ad548ec86d9bad6bfab843e5f9e7ca4460
Author: baomingyu <ba...@163.com>
AuthorDate: Sat Mar 5 15:34:55 2022 +0800
support keeping message order (#2921)
---
.../apache/inlong/dataproxy/base/OrderEvent.java | 62 +++++++++++++++++
.../channel/FailoverChannelProcessor.java | 4 +-
.../dataproxy/channel/FailoverChannelSelector.java | 19 +++++-
.../dataproxy/consts/AttributeConstants.java | 6 ++
.../apache/inlong/dataproxy/sink/PulsarSink.java | 18 +++--
.../dataproxy/sink/pulsar/PulsarClientService.java | 73 +++++++++++++++-----
.../dataproxy/source/ServerMessageHandler.java | 70 +++++++++++---------
.../inlong/dataproxy/utils/MessageUtils.java | 77 ++++++++++++++++++++++
.../sdk/dataproxy/codec/ProtocolDecoder.java | 2 +-
.../sdk/dataproxy/codec/ProtocolEncoder.java | 2 +-
10 files changed, 277 insertions(+), 56 deletions(-)
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/base/OrderEvent.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/base/OrderEvent.java
new file mode 100644
index 0000000..ef42fff
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/base/OrderEvent.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.base;
+
+import io.netty.channel.ChannelHandlerContext;
+import java.util.Map;
+import org.apache.flume.Event;
+
+public class OrderEvent implements Event {
+
+ private ChannelHandlerContext ctx;
+
+ private Event event;
+
+ public OrderEvent(ChannelHandlerContext ctx, Event event) {
+ this.ctx = ctx;
+ this.event = event;
+ }
+
+ @Override
+ public Map<String, String> getHeaders() {
+ return event.getHeaders();
+ }
+
+ @Override
+ public void setHeaders(Map<String, String> map) {
+ event.setHeaders(map);
+ }
+
+ @Override
+ public byte[] getBody() {
+ return event.getBody();
+ }
+
+ @Override
+ public void setBody(byte[] bytes) {
+ event.setBody(bytes);
+ }
+
+ /**
+ * ctx
+ * @return ctx
+ */
+ public ChannelHandlerContext getCtx() {
+ return ctx;
+ }
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/channel/FailoverChannelProcessor.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/channel/FailoverChannelProcessor.java
index 5945ee6..8f898f1 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/channel/FailoverChannelProcessor.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/channel/FailoverChannelProcessor.java
@@ -36,6 +36,7 @@ import org.apache.flume.interceptor.InterceptorBuilderFactory;
import org.apache.flume.interceptor.InterceptorChain;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.apache.inlong.common.monitor.LogCounter;
+import org.apache.inlong.dataproxy.utils.MessageUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -241,7 +242,6 @@ public class FailoverChannelProcessor
boolean success = true;
List<Channel> requiredChannels = selector.getRequiredChannels(event);
-
for (Channel reqChannel : requiredChannels) {
Transaction tx = reqChannel.getTransaction();
Preconditions.checkNotNull(tx, "Transaction object must not be null");
@@ -272,7 +272,7 @@ public class FailoverChannelProcessor
}
}
- if (!success) {
+ if (!success && !MessageUtils.isSyncSendForOrder(event)) {
List<Channel> optionalChannels = selector.getOptionalChannels(event);
for (Channel optChannel : optionalChannels) {
Transaction tx = null;
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/channel/FailoverChannelSelector.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/channel/FailoverChannelSelector.java
index 389c901..58b1063 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/channel/FailoverChannelSelector.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/channel/FailoverChannelSelector.java
@@ -28,7 +28,9 @@ import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.FlumeException;
import org.apache.flume.channel.AbstractChannelSelector;
+import org.apache.inlong.dataproxy.consts.AttributeConstants;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
+import org.apache.inlong.dataproxy.utils.MessageUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,11 +43,13 @@ public class FailoverChannelSelector extends AbstractChannelSelector {
private static final String TRANSFER_CHANNEL = "transfer";
private static final String FILE_METRIC_CHANNEL = "fileMetric";
private static final String SLA_METRIC_CHANNEL = "slaMetric";
+ private static final String ORDER_CHANNEL = "order";
private int masterIndex = 0;
private int slaveIndex = 0;
private final List<Channel> masterChannels = new ArrayList<Channel>();
+ private final List<Channel> orderChannels = new ArrayList<Channel>();
private final List<Channel> slaveChannels = new ArrayList<Channel>();
private final List<Channel> transferChannels = new ArrayList<Channel>();
private final List<Channel> agentFileMetricChannels = new ArrayList<Channel>();
@@ -59,8 +63,15 @@ public class FailoverChannelSelector extends AbstractChannelSelector {
} else if (event.getHeaders().containsKey(ConfigConstants.FILE_CHECK_DATA)) {
retChannels.add(agentFileMetricChannels.get(0));
} else if (event.getHeaders().containsKey(ConfigConstants.SLA_METRIC_DATA)) {
-
retChannels.add(slaMetricChannels.get(0));
+ } else if (MessageUtils.isSyncSendForOrder(event.getHeaders()
+ .get(AttributeConstants.MESSAGE_SYNC_SEND))) {
+ String partitionKey = event.getHeaders().get(AttributeConstants.MESSAGE_PARTITION_KEY);
+ if (partitionKey == null) {
+ partitionKey = "";
+ }
+ int channelIndex = Math.abs(partitionKey.hashCode()) % orderChannels.size();
+ retChannels.add(orderChannels.get(channelIndex));
} else {
retChannels.add(masterChannels.get(masterIndex));
masterIndex = (masterIndex + 1) % masterChannels.size();
@@ -76,7 +87,6 @@ public class FailoverChannelSelector extends AbstractChannelSelector {
} else if (event.getHeaders().containsKey(ConfigConstants.FILE_CHECK_DATA)) {
retChannels.add(agentFileMetricChannels.get(0));
} else if (event.getHeaders().containsKey(ConfigConstants.SLA_METRIC_DATA)) {
-
retChannels.add(slaMetricChannels.get(1));
} else {
retChannels.add(slaveChannels.get(slaveIndex));
@@ -108,6 +118,7 @@ public class FailoverChannelSelector extends AbstractChannelSelector {
String transfer = context.getString(TRANSFER_CHANNEL);
String fileMertic = context.getString(FILE_METRIC_CHANNEL);
String slaMetric = context.getString(SLA_METRIC_CHANNEL);
+ String orderMetric = context.getString(ORDER_CHANNEL);
if (StringUtils.isEmpty(masters)) {
throw new FlumeException("master channel is null!");
}
@@ -115,6 +126,7 @@ public class FailoverChannelSelector extends AbstractChannelSelector {
List<String> transferList = splitChannelName(transfer);
List<String> fileMetricList = splitChannelName(fileMertic);
List<String> slaMetricList = splitChannelName(slaMetric);
+ List<String> orderMetricList = splitChannelName(orderMetric);
for (Map.Entry<String, Channel> entry : getChannelNameMap().entrySet()) {
String channelName = entry.getKey();
@@ -127,11 +139,14 @@ public class FailoverChannelSelector extends AbstractChannelSelector {
this.agentFileMetricChannels.add(channel);
} else if (slaMetricList.contains(channelName)) {
this.slaMetricChannels.add(channel);
+ } else if (orderMetricList.contains(channelName)) {
+ this.orderChannels.add(channel);
} else {
this.slaveChannels.add(channel);
}
}
LOG.info("masters:" + this.masterChannels);
+ LOG.info("orders:" + this.orderChannels);
LOG.info("slaves:" + this.slaveChannels);
LOG.info("transfers:" + this.transferChannels);
LOG.info("agentFileMetrics:" + this.agentFileMetricChannels);
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/AttributeConstants.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/AttributeConstants.java
index cc8f378..c4cbbd5 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/AttributeConstants.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/AttributeConstants.java
@@ -80,4 +80,10 @@ public interface AttributeConstants {
String GROUPID_NUM = "groupIdnum";
String STREAMID_NUM = "streamIdnum";
+
+ String MESSAGE_PARTITION_KEY = "partitionKey";
+
+ String MESSAGE_SYNC_SEND = "syncSend";
+
+ String MESSAGE_IS_ACK = "isAck";
}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
index 10f230c..64be21f 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
@@ -158,6 +158,7 @@ public class PulsarSink extends AbstractSink implements Configurable,
private static final String SEPARATOR = "#";
private boolean isNewMetricOn = true;
+ private boolean keepOrder = false;
private MonitorIndex monitorIndex;
@@ -218,6 +219,7 @@ public class PulsarSink extends AbstractSink implements Configurable,
public void configure(Context context) {
logger.info("PulsarSink started and context = {}", context.toString());
isNewMetricOn = context.getBoolean("new-metric-on", true);
+ keepOrder = context.getBoolean("keep-order", false);
maxMonitorCnt = context.getInteger("max-monitor-cnt", 300000);
configManager = ConfigManager.getInstance();
@@ -266,7 +268,13 @@ public class PulsarSink extends AbstractSink implements Configurable,
resendQueue = new LinkedBlockingQueue<EventStat>(badEventQueueSize);
Preconditions.checkArgument(pulsarConfig.getThreadNum() > 0, "threadNum must be > 0");
- sinkThreadPool = new Thread[pulsarConfig.getThreadNum()];
+ if (keepOrder) {
+ logger.info("This is order pulsar sink!");
+ sinkThreadPool = new Thread[1];
+ } else {
+ sinkThreadPool = new Thread[pulsarConfig.getThreadNum()];
+ }
+
eventQueueSize = pulsarConfig.getEventQueueSize();
eventQueue = new LinkedBlockingQueue<Event>(eventQueueSize);
@@ -557,12 +565,14 @@ public class PulsarSink extends AbstractSink implements Configurable,
}
if (keyPostfix != null && !keyPostfix.equals("")) {
- monitorIndex.addAndGet(new String(newbase), 0, 0, 0, (int) tMsgCounterL);
+ monitorIndex.addAndGet(new String(newbase), 0, 0,
+ 0, (int) tMsgCounterL);
if (logPrinterB.shouldPrint()) {
logger.warn("error cannot send event, {} event size is {}", topic, messageSize);
}
} else {
- monitorIndex.addAndGet(new String(newbase), (int) tMsgCounterL, 1, messageSize, 0);
+ monitorIndex.addAndGet(new String(newbase), (int) tMsgCounterL,
+ 1, messageSize, 0);
}
}
}
@@ -582,7 +592,7 @@ public class PulsarSink extends AbstractSink implements Configurable,
}
@Override
- public void handleMessageSendSuccess(String topic, Object result, EventStat eventStat) {
+ public void handleMessageSendSuccess(String topic, Object result, EventStat eventStat) {
/*
* Statistics pulsar performance
*/
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
index 7c518fa..87f67dd 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
@@ -18,21 +18,26 @@
package org.apache.inlong.dataproxy.sink.pulsar;
import com.google.common.base.Preconditions;
+import io.netty.buffer.ByteBuf;
import org.apache.commons.lang.StringUtils;
import org.apache.flume.Event;
import org.apache.flume.FlumeException;
import org.apache.inlong.common.monitor.LogCounter;
import org.apache.inlong.common.reporpter.ConfigLogTypeEnum;
import org.apache.inlong.common.reporpter.StreamConfigLogMetric;
+import org.apache.inlong.dataproxy.base.OrderEvent;
import org.apache.inlong.dataproxy.config.ConfigManager;
import org.apache.inlong.dataproxy.config.pojo.ThirdPartyClusterConfig;
import org.apache.inlong.dataproxy.consts.AttributeConstants;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
import org.apache.inlong.dataproxy.sink.EventStat;
+import org.apache.inlong.dataproxy.source.MsgType;
+import org.apache.inlong.dataproxy.utils.MessageUtils;
import org.apache.inlong.dataproxy.utils.NetworkUtils;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
@@ -95,7 +100,7 @@ public class PulsarClientService {
sendTimeout = pulsarConfig.getSendTimeoutMs();
retryIntervalWhenSendMsgError = pulsarConfig.getRetryIntervalWhenSendErrorMs();
clientTimeout = pulsarConfig.getClientTimeoutSecond();
- logger.debug("PulsarClientService " + sendTimeout);
+
Preconditions.checkArgument(sendTimeout > 0, "sendTimeout must be > 0");
pulsarClientIoThreads = pulsarConfig.getPulsarClientIoThreads();
@@ -177,25 +182,63 @@ public class PulsarClientService {
proMap.put(inlongStreamId, event.getHeaders().get(ConfigConstants.PKG_TIME_KEY));
TopicProducerInfo forCallBackP = producer;
- forCallBackP.getProducer().newMessage().properties(proMap).value(event.getBody())
- .sendAsync().thenAccept((msgId) -> {
- AuditUtils.add(AuditUtils.AUDIT_ID_DATAPROXY_SEND_SUCCESS, event);
- forCallBackP.setCanUseSend(true);
- sendMessageCallBack.handleMessageSendSuccess(topic, (MessageIdImpl) msgId, es);
- }).exceptionally((e) -> {
- if (streamConfigLogMetric != null) {
- streamConfigLogMetric.updateConfigLog(inlongGroupId,
- inlongStreamId, StreamConfigLogMetric.CONFIG_LOG_PULSAR_PRODUCER,
- ConfigLogTypeEnum.ERROR, e.toString());
+
+ if (MessageUtils.isSyncSendForOrder(event) && (event instanceof OrderEvent)) {
+ String partitionKey = event.getHeaders().get(AttributeConstants.MESSAGE_PARTITION_KEY);
+ try {
+ MessageId msgId = forCallBackP.getProducer().newMessage().key(partitionKey)
+ .properties(proMap).value(event.getBody())
+ .send();
+ sendResponse((OrderEvent)event);
+ sendMessageCallBack.handleMessageSendSuccess(topic, msgId, es);
+ AuditUtils.add(AuditUtils.AUDIT_ID_DATAPROXY_SEND_SUCCESS, event);
+ forCallBackP.setCanUseSend(true);
+ } catch (PulsarClientException ex) {
+ if (streamConfigLogMetric != null) {
+ streamConfigLogMetric.updateConfigLog(inlongGroupId,
+ inlongStreamId, StreamConfigLogMetric.CONFIG_LOG_PULSAR_PRODUCER,
+ ConfigLogTypeEnum.ERROR, ex.toString());
+ }
+ forCallBackP.setCanUseSend(false);
+ sendMessageCallBack.handleMessageSendException(topic, es, ex);
}
- forCallBackP.setCanUseSend(false);
- sendMessageCallBack.handleMessageSendException(topic, es, e);
- return null;
- });
+
+ } else {
+ forCallBackP.getProducer().newMessage().properties(proMap).value(event.getBody())
+ .sendAsync().thenAccept((msgId) -> {
+ AuditUtils.add(AuditUtils.AUDIT_ID_DATAPROXY_SEND_SUCCESS, event);
+ forCallBackP.setCanUseSend(true);
+ sendMessageCallBack.handleMessageSendSuccess(topic, (MessageIdImpl) msgId, es);
+ }).exceptionally((e) -> {
+ if (streamConfigLogMetric != null) {
+ streamConfigLogMetric.updateConfigLog(inlongGroupId,
+ inlongStreamId, StreamConfigLogMetric.CONFIG_LOG_PULSAR_PRODUCER,
+ ConfigLogTypeEnum.ERROR, e.toString());
+ }
+ forCallBackP.setCanUseSend(false);
+ sendMessageCallBack.handleMessageSendException(topic, es, e);
+ return null;
+ });
+ }
return true;
}
/**
+ * send Response
+ * @param orderEvent orderEvent
+ */
+ private void sendResponse(OrderEvent orderEvent) {
+ if (orderEvent.getCtx() != null && orderEvent.getCtx().channel().isActive()) {
+ orderEvent.getCtx().channel().eventLoop().execute(() -> {
+ ByteBuf binBuffer = MessageUtils.getResponsePackage("",
+ MsgType.MSG_BIN_MULTI_BODY,
+ orderEvent.getHeaders().get(AttributeConstants.UNIQ_ID));
+ orderEvent.getCtx().writeAndFlush(binBuffer);
+ });
+ }
+ }
+
+ /**
* If this function is called successively without calling {@see #destroyConnection()}, only the
* first call has any effect.
*
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
index bf15ce5..9aa60e3 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
@@ -48,6 +48,7 @@ import org.apache.flume.source.AbstractSource;
import org.apache.inlong.common.monitor.MonitorIndex;
import org.apache.inlong.common.monitor.MonitorIndexExt;
import org.apache.inlong.common.msg.InLongMsg;
+import org.apache.inlong.dataproxy.base.OrderEvent;
import org.apache.inlong.dataproxy.base.ProxyMessage;
import org.apache.inlong.dataproxy.config.ConfigManager;
import org.apache.inlong.dataproxy.consts.AttributeConstants;
@@ -57,6 +58,7 @@ import org.apache.inlong.dataproxy.exception.MessageIDException;
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
+import org.apache.inlong.dataproxy.utils.MessageUtils;
import org.apache.inlong.dataproxy.utils.NetworkUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -381,7 +383,7 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
}
}
- private void formatMessagesAndSend(Map<String, String> commonAttrMap,
+ private void formatMessagesAndSend(ChannelHandlerContext ctx, Map<String, String> commonAttrMap,
Map<String, HashMap<String, List<ProxyMessage>>> messageMap,
String strRemoteIP, MsgType msgType) throws MessageIDException {
@@ -425,6 +427,16 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
headers.put(AttributeConstants.DATA_TIME, String.valueOf(System.currentTimeMillis()));
}
+ String syncSend = commonAttrMap.get(AttributeConstants.MESSAGE_SYNC_SEND);
+ if (StringUtils.isNotEmpty(syncSend)) {
+ headers.put(AttributeConstants.MESSAGE_SYNC_SEND, syncSend);
+ }
+
+ String partitionKey = commonAttrMap.get(AttributeConstants.MESSAGE_PARTITION_KEY);
+ if (StringUtils.isNotEmpty(partitionKey)) {
+ headers.put(AttributeConstants.MESSAGE_PARTITION_KEY, partitionKey);
+ }
+
headers.put(ConfigConstants.TOPIC_KEY, topicEntry.getKey());
headers.put(AttributeConstants.GROUP_ID,
streamIdEntry.getValue().get(0).getGroupId());
@@ -438,6 +450,8 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
byte[] data = inLongMsg.buildArray();
headers.put(ConfigConstants.TOTAL_LEN, String.valueOf(data.length));
+ headers.put(AttributeConstants.UNIQ_ID,
+ commonAttrMap.get(AttributeConstants.UNIQ_ID));
String sequenceId = commonAttrMap.get(AttributeConstants.SEQUENCE_ID);
if (StringUtils.isNotEmpty(sequenceId)) {
StringBuilder sidBuilder = new StringBuilder();
@@ -448,7 +462,9 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
headers.put(ConfigConstants.PKG_TIME_KEY, pkgTimeStr);
Event event = EventBuilder.withBody(data, headers);
-
+ if (MessageUtils.isSyncSendForOrder(event)) {
+ event = new OrderEvent(ctx, event);
+ }
long dtten = 0;
try {
dtten = Long.parseLong(headers.get(AttributeConstants.DATA_TIME));
@@ -539,32 +555,8 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
if (resultMap.containsKey(ConfigConstants.DECODER_ATTRS)) {
backattrs = (String) resultMap.get(ConfigConstants.DECODER_ATTRS);
}
-
- int binTotalLen = 1 + 4 + 2 + 2;
- if (null != backattrs) {
- binTotalLen += backattrs.length();
- }
-
- ByteBuf binBuffer = ByteBufAllocator.DEFAULT.buffer(4 + binTotalLen);
- binBuffer.writeInt(binTotalLen);
- binBuffer.writeByte(msgType.getValue());
-
- long uniqVal = Long.parseLong(commonAttrMap.get(AttributeConstants.UNIQ_ID));
- byte[] uniq = new byte[4];
- uniq[0] = (byte) ((uniqVal >> 24) & 0xFF);
- uniq[1] = (byte) ((uniqVal >> 16) & 0xFF);
- uniq[2] = (byte) ((uniqVal >> 8) & 0xFF);
- uniq[3] = (byte) (uniqVal & 0xFF);
- binBuffer.writeBytes(uniq);
-
- if (null != backattrs) {
- binBuffer.writeShort(backattrs.length());
- binBuffer.writeBytes(backattrs.getBytes(StandardCharsets.UTF_8));
- } else {
- binBuffer.writeShort(0x0);
- }
-
- binBuffer.writeShort(0xee01);
+ String uniqVal = commonAttrMap.get(AttributeConstants.UNIQ_ID);
+ ByteBuf binBuffer = MessageUtils.getResponsePackage(backattrs, msgType, uniqVal);
if (remoteChannel.isWritable()) {
remoteChannel.writeAndFlush(binBuffer);
logger.debug("Connection info: {} ; attr is {} ; uniqVal {}",
@@ -647,16 +639,23 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
updateMsgList(msgList, commonAttrMap, messageMap, strRemoteIP, msgType);
- formatMessagesAndSend(commonAttrMap, messageMap, strRemoteIP, msgType);
+ formatMessagesAndSend(ctx, commonAttrMap, messageMap,
+ strRemoteIP, msgType);
} else if (msgList != null && commonAttrMap.containsKey(ConfigConstants.FILE_CHECK_DATA)) {
Map<String, String> headers = new HashMap<String, String>();
headers.put("msgtype", "filestatus");
headers.put(ConfigConstants.FILE_CHECK_DATA,
"true");
+ headers.put(AttributeConstants.UNIQ_ID,
+ commonAttrMap.get(AttributeConstants.UNIQ_ID));
for (ProxyMessage message : msgList) {
byte[] body = message.getData();
Event event = EventBuilder.withBody(body, headers);
+ if (MessageUtils.isSyncSendForOrder(commonAttrMap
+ .get(AttributeConstants.MESSAGE_SYNC_SEND))) {
+ event = new OrderEvent(ctx, event);
+ }
try {
processor.processEvent(event);
this.addMetric(true, body.length, event);
@@ -674,9 +673,15 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
headers.put("msgtype", "measure");
headers.put(ConfigConstants.FILE_CHECK_DATA,
"true");
+ headers.put(AttributeConstants.UNIQ_ID,
+ commonAttrMap.get(AttributeConstants.UNIQ_ID));
for (ProxyMessage message : msgList) {
byte[] body = message.getData();
Event event = EventBuilder.withBody(body, headers);
+ if (MessageUtils.isSyncSendForOrder(commonAttrMap
+ .get(AttributeConstants.MESSAGE_SYNC_SEND))) {
+ event = new OrderEvent(ctx, event);
+ }
try {
processor.processEvent(event);
this.addMetric(true, body.length, event);
@@ -689,8 +694,11 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
}
}
SocketAddress remoteSocketAddress = remoteChannel.remoteAddress();
- responsePackage(ctx, commonAttrMap, resultMap, remoteChannel,
- remoteSocketAddress, msgType);
+ if (!MessageUtils.isSyncSendForOrder(commonAttrMap
+ .get(AttributeConstants.MESSAGE_SYNC_SEND))) {
+ responsePackage(ctx, commonAttrMap, resultMap, remoteChannel,
+ remoteSocketAddress, msgType);
+ }
} finally {
cb.release();
}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/MessageUtils.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/MessageUtils.java
new file mode 100644
index 0000000..02b83c8
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/MessageUtils.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.inlong.dataproxy.utils;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import java.nio.charset.StandardCharsets;
+import org.apache.commons.lang.StringUtils;
+import org.apache.flume.Event;
+import org.apache.inlong.dataproxy.consts.AttributeConstants;
+import org.apache.inlong.dataproxy.source.MsgType;
+
+public class MessageUtils {
+
+ /**
+ * is or not sync send for order message
+ * @param syncSend syncSend
+ * @return true/false
+ */
+ public static boolean isSyncSendForOrder(String syncSend) {
+ if (StringUtils.isNotEmpty(syncSend) && "true".equals(syncSend)) {
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * is or not sync send for order message
+ * @param event event
+ * @return true/false
+ */
+ public static boolean isSyncSendForOrder(Event event) {
+ String syncSend = event.getHeaders().get(AttributeConstants.MESSAGE_SYNC_SEND);
+ return isSyncSendForOrder(syncSend);
+ }
+
+ public static ByteBuf getResponsePackage(String backattrs, MsgType msgType, String sequenceId) {
+ int binTotalLen = 1 + 4 + 2 + 2;
+ if (null != backattrs) {
+ binTotalLen += backattrs.length();
+ }
+ ByteBuf binBuffer = ByteBufAllocator.DEFAULT.buffer(4 + binTotalLen);
+ binBuffer.writeInt(binTotalLen);
+ binBuffer.writeByte(msgType.getValue());
+
+ long uniqVal = Long.parseLong(sequenceId);
+ byte[] uniq = new byte[4];
+ uniq[0] = (byte) ((uniqVal >> 24) & 0xFF);
+ uniq[1] = (byte) ((uniqVal >> 16) & 0xFF);
+ uniq[2] = (byte) ((uniqVal >> 8) & 0xFF);
+ uniq[3] = (byte) (uniqVal & 0xFF);
+ binBuffer.writeBytes(uniq);
+
+ if (null != backattrs) {
+ binBuffer.writeShort(backattrs.length());
+ binBuffer.writeBytes(backattrs.getBytes(StandardCharsets.UTF_8));
+ } else {
+ binBuffer.writeShort(0x0);
+ }
+ binBuffer.writeShort(0xee01);
+ return binBuffer;
+ }
+
+}
diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolDecoder.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolDecoder.java
index db86efd..579b07b 100644
--- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolDecoder.java
+++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolDecoder.java
@@ -32,7 +32,7 @@ public class ProtocolDecoder extends MessageToMessageDecoder<ByteBuf> {
private static final Logger logger = LoggerFactory.getLogger(ProtocolDecoder.class);
@Override
- protected void decode(ChannelHandlerContext var1,
+ protected void decode(ChannelHandlerContext ctx,
ByteBuf buffer, List<Object> out) throws Exception {
buffer.markReaderIndex();
// totallen
diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolEncoder.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolEncoder.java
index 3cfa2f9..b9b68a0 100644
--- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolEncoder.java
+++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolEncoder.java
@@ -46,7 +46,7 @@ public class ProtocolEncoder extends MessageToMessageEncoder<EncodeObject> {
private static final Logger logger = LoggerFactory
.getLogger(ProtocolEncoder.class);
- protected void encode(ChannelHandlerContext var1,
+ protected void encode(ChannelHandlerContext ctx,
EncodeObject message, List<Object> out) throws Exception {
ByteBuf buf = null;
try {