You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by al...@apache.org on 2022/03/05 07:34:59 UTC

[incubator-inlong] branch master updated: support keeping message order (#2921)

This is an automated email from the ASF dual-hosted git repository.

aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 99fb55a  support keeping message order (#2921)
99fb55a is described below

commit 99fb55ad548ec86d9bad6bfab843e5f9e7ca4460
Author: baomingyu <ba...@163.com>
AuthorDate: Sat Mar 5 15:34:55 2022 +0800

    support keeping message order (#2921)
---
 .../apache/inlong/dataproxy/base/OrderEvent.java   | 62 +++++++++++++++++
 .../channel/FailoverChannelProcessor.java          |  4 +-
 .../dataproxy/channel/FailoverChannelSelector.java | 19 +++++-
 .../dataproxy/consts/AttributeConstants.java       |  6 ++
 .../apache/inlong/dataproxy/sink/PulsarSink.java   | 18 +++--
 .../dataproxy/sink/pulsar/PulsarClientService.java | 73 +++++++++++++++-----
 .../dataproxy/source/ServerMessageHandler.java     | 70 +++++++++++---------
 .../inlong/dataproxy/utils/MessageUtils.java       | 77 ++++++++++++++++++++++
 .../sdk/dataproxy/codec/ProtocolDecoder.java       |  2 +-
 .../sdk/dataproxy/codec/ProtocolEncoder.java       |  2 +-
 10 files changed, 277 insertions(+), 56 deletions(-)

diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/base/OrderEvent.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/base/OrderEvent.java
new file mode 100644
index 0000000..ef42fff
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/base/OrderEvent.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.base;
+
+import io.netty.channel.ChannelHandlerContext;
+import java.util.Map;
+import org.apache.flume.Event;
+
+public class OrderEvent implements Event {
+
+    private ChannelHandlerContext ctx;
+
+    private Event event;
+
+    public OrderEvent(ChannelHandlerContext ctx, Event event) {
+        this.ctx = ctx;
+        this.event = event;
+    }
+
+    @Override
+    public Map<String, String> getHeaders() {
+        return event.getHeaders();
+    }
+
+    @Override
+    public void setHeaders(Map<String, String> map) {
+        event.setHeaders(map);
+    }
+
+    @Override
+    public byte[] getBody() {
+        return event.getBody();
+    }
+
+    @Override
+    public void setBody(byte[] bytes) {
+        event.setBody(bytes);
+    }
+
+    /**
+     * ctx
+     * @return ctx
+     */
+    public ChannelHandlerContext getCtx() {
+        return ctx;
+    }
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/channel/FailoverChannelProcessor.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/channel/FailoverChannelProcessor.java
index 5945ee6..8f898f1 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/channel/FailoverChannelProcessor.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/channel/FailoverChannelProcessor.java
@@ -36,6 +36,7 @@ import org.apache.flume.interceptor.InterceptorBuilderFactory;
 import org.apache.flume.interceptor.InterceptorChain;
 import org.apache.inlong.dataproxy.consts.ConfigConstants;
 import org.apache.inlong.common.monitor.LogCounter;
+import org.apache.inlong.dataproxy.utils.MessageUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -241,7 +242,6 @@ public class FailoverChannelProcessor
 
         boolean success = true;
         List<Channel> requiredChannels = selector.getRequiredChannels(event);
-
         for (Channel reqChannel : requiredChannels) {
             Transaction tx = reqChannel.getTransaction();
             Preconditions.checkNotNull(tx, "Transaction object must not be null");
@@ -272,7 +272,7 @@ public class FailoverChannelProcessor
             }
         }
 
-        if (!success) {
+        if (!success && !MessageUtils.isSyncSendForOrder(event)) {
             List<Channel> optionalChannels = selector.getOptionalChannels(event);
             for (Channel optChannel : optionalChannels) {
                 Transaction tx = null;
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/channel/FailoverChannelSelector.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/channel/FailoverChannelSelector.java
index 389c901..58b1063 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/channel/FailoverChannelSelector.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/channel/FailoverChannelSelector.java
@@ -28,7 +28,9 @@ import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.FlumeException;
 import org.apache.flume.channel.AbstractChannelSelector;
+import org.apache.inlong.dataproxy.consts.AttributeConstants;
 import org.apache.inlong.dataproxy.consts.ConfigConstants;
+import org.apache.inlong.dataproxy.utils.MessageUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,11 +43,13 @@ public class FailoverChannelSelector extends AbstractChannelSelector {
     private static final String TRANSFER_CHANNEL = "transfer";
     private static final String FILE_METRIC_CHANNEL = "fileMetric";
     private static final String SLA_METRIC_CHANNEL = "slaMetric";
+    private static final String ORDER_CHANNEL = "order";
 
     private int masterIndex = 0;
     private int slaveIndex = 0;
 
     private final List<Channel> masterChannels = new ArrayList<Channel>();
+    private final List<Channel> orderChannels = new ArrayList<Channel>();
     private final List<Channel> slaveChannels = new ArrayList<Channel>();
     private final List<Channel> transferChannels = new ArrayList<Channel>();
     private final List<Channel> agentFileMetricChannels = new ArrayList<Channel>();
@@ -59,8 +63,15 @@ public class FailoverChannelSelector extends AbstractChannelSelector {
         } else if (event.getHeaders().containsKey(ConfigConstants.FILE_CHECK_DATA)) {
             retChannels.add(agentFileMetricChannels.get(0));
         } else if (event.getHeaders().containsKey(ConfigConstants.SLA_METRIC_DATA)) {
-
             retChannels.add(slaMetricChannels.get(0));
+        } else if (MessageUtils.isSyncSendForOrder(event.getHeaders()
+                .get(AttributeConstants.MESSAGE_SYNC_SEND))) {
+            String partitionKey = event.getHeaders().get(AttributeConstants.MESSAGE_PARTITION_KEY);
+            if (partitionKey == null) {
+                partitionKey = "";
+            }
+            int channelIndex = Math.abs(partitionKey.hashCode()) % orderChannels.size();
+            retChannels.add(orderChannels.get(channelIndex));
         } else {
             retChannels.add(masterChannels.get(masterIndex));
             masterIndex = (masterIndex + 1) % masterChannels.size();
@@ -76,7 +87,6 @@ public class FailoverChannelSelector extends AbstractChannelSelector {
         } else if (event.getHeaders().containsKey(ConfigConstants.FILE_CHECK_DATA)) {
             retChannels.add(agentFileMetricChannels.get(0));
         } else if (event.getHeaders().containsKey(ConfigConstants.SLA_METRIC_DATA)) {
-
             retChannels.add(slaMetricChannels.get(1));
         } else {
             retChannels.add(slaveChannels.get(slaveIndex));
@@ -108,6 +118,7 @@ public class FailoverChannelSelector extends AbstractChannelSelector {
         String transfer = context.getString(TRANSFER_CHANNEL);
         String fileMertic = context.getString(FILE_METRIC_CHANNEL);
         String slaMetric = context.getString(SLA_METRIC_CHANNEL);
+        String orderMetric = context.getString(ORDER_CHANNEL);
         if (StringUtils.isEmpty(masters)) {
             throw new FlumeException("master channel is null!");
         }
@@ -115,6 +126,7 @@ public class FailoverChannelSelector extends AbstractChannelSelector {
         List<String> transferList = splitChannelName(transfer);
         List<String> fileMetricList = splitChannelName(fileMertic);
         List<String> slaMetricList = splitChannelName(slaMetric);
+        List<String> orderMetricList = splitChannelName(orderMetric);
 
         for (Map.Entry<String, Channel> entry : getChannelNameMap().entrySet()) {
             String channelName = entry.getKey();
@@ -127,11 +139,14 @@ public class FailoverChannelSelector extends AbstractChannelSelector {
                 this.agentFileMetricChannels.add(channel);
             } else if (slaMetricList.contains(channelName)) {
                 this.slaMetricChannels.add(channel);
+            } else if (orderMetricList.contains(channelName)) {
+                this.orderChannels.add(channel);
             } else {
                 this.slaveChannels.add(channel);
             }
         }
         LOG.info("masters:" + this.masterChannels);
+        LOG.info("orders:" + this.orderChannels);
         LOG.info("slaves:" + this.slaveChannels);
         LOG.info("transfers:" + this.transferChannels);
         LOG.info("agentFileMetrics:" + this.agentFileMetricChannels);
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/AttributeConstants.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/AttributeConstants.java
index cc8f378..c4cbbd5 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/AttributeConstants.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/AttributeConstants.java
@@ -80,4 +80,10 @@ public interface AttributeConstants {
     String GROUPID_NUM = "groupIdnum";
 
     String STREAMID_NUM = "streamIdnum";
+
+    String MESSAGE_PARTITION_KEY = "partitionKey";
+
+    String MESSAGE_SYNC_SEND = "syncSend";
+
+    String MESSAGE_IS_ACK = "isAck";
 }
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
index 10f230c..64be21f 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
@@ -158,6 +158,7 @@ public class PulsarSink extends AbstractSink implements Configurable,
 
     private static final String SEPARATOR = "#";
     private boolean isNewMetricOn = true;
+    private boolean keepOrder = false;
 
 
     private MonitorIndex monitorIndex;
@@ -218,6 +219,7 @@ public class PulsarSink extends AbstractSink implements Configurable,
     public void configure(Context context) {
         logger.info("PulsarSink started and context = {}", context.toString());
         isNewMetricOn = context.getBoolean("new-metric-on", true);
+        keepOrder = context.getBoolean("keep-order", false);
         maxMonitorCnt = context.getInteger("max-monitor-cnt", 300000);
 
         configManager = ConfigManager.getInstance();
@@ -266,7 +268,13 @@ public class PulsarSink extends AbstractSink implements Configurable,
         resendQueue = new LinkedBlockingQueue<EventStat>(badEventQueueSize);
 
         Preconditions.checkArgument(pulsarConfig.getThreadNum() > 0, "threadNum must be > 0");
-        sinkThreadPool = new Thread[pulsarConfig.getThreadNum()];
+        if (keepOrder) {
+            logger.info("This is order pulsar sink!");
+            sinkThreadPool = new Thread[1];
+        } else {
+            sinkThreadPool = new Thread[pulsarConfig.getThreadNum()];
+        }
+
         eventQueueSize = pulsarConfig.getEventQueueSize();
         eventQueue = new LinkedBlockingQueue<Event>(eventQueueSize);
 
@@ -557,12 +565,14 @@ public class PulsarSink extends AbstractSink implements Configurable,
                     }
 
                     if (keyPostfix != null && !keyPostfix.equals("")) {
-                        monitorIndex.addAndGet(new String(newbase), 0, 0, 0, (int) tMsgCounterL);
+                        monitorIndex.addAndGet(new String(newbase), 0, 0,
+                                0, (int) tMsgCounterL);
                         if (logPrinterB.shouldPrint()) {
                             logger.warn("error cannot send event, {} event size is {}", topic, messageSize);
                         }
                     } else {
-                        monitorIndex.addAndGet(new String(newbase), (int) tMsgCounterL, 1, messageSize, 0);
+                        monitorIndex.addAndGet(new String(newbase), (int) tMsgCounterL,
+                                1, messageSize, 0);
                     }
                 }
             }
@@ -582,7 +592,7 @@ public class PulsarSink extends AbstractSink implements Configurable,
     }
 
     @Override
-    public void handleMessageSendSuccess(String topic, Object result,  EventStat eventStat) {
+    public void handleMessageSendSuccess(String topic, Object result, EventStat eventStat) {
         /*
          * Statistics pulsar performance
          */
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
index 7c518fa..87f67dd 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
@@ -18,21 +18,26 @@
 package org.apache.inlong.dataproxy.sink.pulsar;
 
 import com.google.common.base.Preconditions;
+import io.netty.buffer.ByteBuf;
 import org.apache.commons.lang.StringUtils;
 import org.apache.flume.Event;
 import org.apache.flume.FlumeException;
 import org.apache.inlong.common.monitor.LogCounter;
 import org.apache.inlong.common.reporpter.ConfigLogTypeEnum;
 import org.apache.inlong.common.reporpter.StreamConfigLogMetric;
+import org.apache.inlong.dataproxy.base.OrderEvent;
 import org.apache.inlong.dataproxy.config.ConfigManager;
 import org.apache.inlong.dataproxy.config.pojo.ThirdPartyClusterConfig;
 import org.apache.inlong.dataproxy.consts.AttributeConstants;
 import org.apache.inlong.dataproxy.consts.ConfigConstants;
 import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
 import org.apache.inlong.dataproxy.sink.EventStat;
+import org.apache.inlong.dataproxy.source.MsgType;
+import org.apache.inlong.dataproxy.utils.MessageUtils;
 import org.apache.inlong.dataproxy.utils.NetworkUtils;
 import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -95,7 +100,7 @@ public class PulsarClientService {
         sendTimeout = pulsarConfig.getSendTimeoutMs();
         retryIntervalWhenSendMsgError = pulsarConfig.getRetryIntervalWhenSendErrorMs();
         clientTimeout = pulsarConfig.getClientTimeoutSecond();
-        logger.debug("PulsarClientService " + sendTimeout);
+
         Preconditions.checkArgument(sendTimeout > 0, "sendTimeout must be > 0");
 
         pulsarClientIoThreads = pulsarConfig.getPulsarClientIoThreads();
@@ -177,25 +182,63 @@ public class PulsarClientService {
         proMap.put(inlongStreamId, event.getHeaders().get(ConfigConstants.PKG_TIME_KEY));
 
         TopicProducerInfo forCallBackP = producer;
-        forCallBackP.getProducer().newMessage().properties(proMap).value(event.getBody())
-                .sendAsync().thenAccept((msgId) -> {
-            AuditUtils.add(AuditUtils.AUDIT_ID_DATAPROXY_SEND_SUCCESS, event);
-            forCallBackP.setCanUseSend(true);
-            sendMessageCallBack.handleMessageSendSuccess(topic, (MessageIdImpl) msgId, es);
-        }).exceptionally((e) -> {
-            if (streamConfigLogMetric != null) {
-                streamConfigLogMetric.updateConfigLog(inlongGroupId,
-                        inlongStreamId, StreamConfigLogMetric.CONFIG_LOG_PULSAR_PRODUCER,
-                        ConfigLogTypeEnum.ERROR, e.toString());
+
+        if (MessageUtils.isSyncSendForOrder(event) && (event instanceof OrderEvent)) {
+            String partitionKey = event.getHeaders().get(AttributeConstants.MESSAGE_PARTITION_KEY);
+            try {
+                MessageId msgId = forCallBackP.getProducer().newMessage().key(partitionKey)
+                        .properties(proMap).value(event.getBody())
+                        .send();
+                sendResponse((OrderEvent)event);
+                sendMessageCallBack.handleMessageSendSuccess(topic, msgId, es);
+                AuditUtils.add(AuditUtils.AUDIT_ID_DATAPROXY_SEND_SUCCESS, event);
+                forCallBackP.setCanUseSend(true);
+            } catch (PulsarClientException ex) {
+                if (streamConfigLogMetric != null) {
+                    streamConfigLogMetric.updateConfigLog(inlongGroupId,
+                            inlongStreamId, StreamConfigLogMetric.CONFIG_LOG_PULSAR_PRODUCER,
+                            ConfigLogTypeEnum.ERROR, ex.toString());
+                }
+                forCallBackP.setCanUseSend(false);
+                sendMessageCallBack.handleMessageSendException(topic, es, ex);
             }
-            forCallBackP.setCanUseSend(false);
-            sendMessageCallBack.handleMessageSendException(topic, es, e);
-            return null;
-        });
+
+        } else {
+            forCallBackP.getProducer().newMessage().properties(proMap).value(event.getBody())
+                    .sendAsync().thenAccept((msgId) -> {
+                AuditUtils.add(AuditUtils.AUDIT_ID_DATAPROXY_SEND_SUCCESS, event);
+                forCallBackP.setCanUseSend(true);
+                sendMessageCallBack.handleMessageSendSuccess(topic, (MessageIdImpl) msgId, es);
+            }).exceptionally((e) -> {
+                if (streamConfigLogMetric != null) {
+                    streamConfigLogMetric.updateConfigLog(inlongGroupId,
+                            inlongStreamId, StreamConfigLogMetric.CONFIG_LOG_PULSAR_PRODUCER,
+                            ConfigLogTypeEnum.ERROR, e.toString());
+                }
+                forCallBackP.setCanUseSend(false);
+                sendMessageCallBack.handleMessageSendException(topic, es, e);
+                return null;
+            });
+        }
         return true;
     }
 
     /**
+     * send Response
+     * @param orderEvent orderEvent
+     */
+    private void sendResponse(OrderEvent orderEvent) {
+        if (orderEvent.getCtx() != null && orderEvent.getCtx().channel().isActive()) {
+            orderEvent.getCtx().channel().eventLoop().execute(() -> {
+                ByteBuf binBuffer = MessageUtils.getResponsePackage("",
+                        MsgType.MSG_BIN_MULTI_BODY,
+                        orderEvent.getHeaders().get(AttributeConstants.UNIQ_ID));
+                orderEvent.getCtx().writeAndFlush(binBuffer);
+            });
+        }
+    }
+
+    /**
      * If this function is called successively without calling {@see #destroyConnection()}, only the
      * first call has any effect.
      *
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
index bf15ce5..9aa60e3 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
@@ -48,6 +48,7 @@ import org.apache.flume.source.AbstractSource;
 import org.apache.inlong.common.monitor.MonitorIndex;
 import org.apache.inlong.common.monitor.MonitorIndexExt;
 import org.apache.inlong.common.msg.InLongMsg;
+import org.apache.inlong.dataproxy.base.OrderEvent;
 import org.apache.inlong.dataproxy.base.ProxyMessage;
 import org.apache.inlong.dataproxy.config.ConfigManager;
 import org.apache.inlong.dataproxy.consts.AttributeConstants;
@@ -57,6 +58,7 @@ import org.apache.inlong.dataproxy.exception.MessageIDException;
 import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
 import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
 import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
+import org.apache.inlong.dataproxy.utils.MessageUtils;
 import org.apache.inlong.dataproxy.utils.NetworkUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -381,7 +383,7 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
         }
     }
 
-    private void formatMessagesAndSend(Map<String, String> commonAttrMap,
+    private void formatMessagesAndSend(ChannelHandlerContext ctx, Map<String, String> commonAttrMap,
             Map<String, HashMap<String, List<ProxyMessage>>> messageMap,
             String strRemoteIP, MsgType msgType) throws MessageIDException {
 
@@ -425,6 +427,16 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
                     headers.put(AttributeConstants.DATA_TIME, String.valueOf(System.currentTimeMillis()));
                 }
 
+                String syncSend = commonAttrMap.get(AttributeConstants.MESSAGE_SYNC_SEND);
+                if (StringUtils.isNotEmpty(syncSend)) {
+                    headers.put(AttributeConstants.MESSAGE_SYNC_SEND, syncSend);
+                }
+
+                String partitionKey = commonAttrMap.get(AttributeConstants.MESSAGE_PARTITION_KEY);
+                if (StringUtils.isNotEmpty(partitionKey)) {
+                    headers.put(AttributeConstants.MESSAGE_PARTITION_KEY, partitionKey);
+                }
+
                 headers.put(ConfigConstants.TOPIC_KEY, topicEntry.getKey());
                 headers.put(AttributeConstants.GROUP_ID,
                         streamIdEntry.getValue().get(0).getGroupId());
@@ -438,6 +450,8 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
                 byte[] data = inLongMsg.buildArray();
                 headers.put(ConfigConstants.TOTAL_LEN, String.valueOf(data.length));
 
+                headers.put(AttributeConstants.UNIQ_ID,
+                        commonAttrMap.get(AttributeConstants.UNIQ_ID));
                 String sequenceId = commonAttrMap.get(AttributeConstants.SEQUENCE_ID);
                 if (StringUtils.isNotEmpty(sequenceId)) {
                     StringBuilder sidBuilder = new StringBuilder();
@@ -448,7 +462,9 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
 
                 headers.put(ConfigConstants.PKG_TIME_KEY, pkgTimeStr);
                 Event event = EventBuilder.withBody(data, headers);
-
+                if (MessageUtils.isSyncSendForOrder(event)) {
+                    event = new OrderEvent(ctx, event);
+                }
                 long dtten = 0;
                 try {
                     dtten = Long.parseLong(headers.get(AttributeConstants.DATA_TIME));
@@ -539,32 +555,8 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
                 if (resultMap.containsKey(ConfigConstants.DECODER_ATTRS)) {
                     backattrs = (String) resultMap.get(ConfigConstants.DECODER_ATTRS);
                 }
-
-                int binTotalLen = 1 + 4 + 2 + 2;
-                if (null != backattrs) {
-                    binTotalLen += backattrs.length();
-                }
-
-                ByteBuf binBuffer = ByteBufAllocator.DEFAULT.buffer(4 + binTotalLen);
-                binBuffer.writeInt(binTotalLen);
-                binBuffer.writeByte(msgType.getValue());
-
-                long uniqVal = Long.parseLong(commonAttrMap.get(AttributeConstants.UNIQ_ID));
-                byte[] uniq = new byte[4];
-                uniq[0] = (byte) ((uniqVal >> 24) & 0xFF);
-                uniq[1] = (byte) ((uniqVal >> 16) & 0xFF);
-                uniq[2] = (byte) ((uniqVal >> 8) & 0xFF);
-                uniq[3] = (byte) (uniqVal & 0xFF);
-                binBuffer.writeBytes(uniq);
-
-                if (null != backattrs) {
-                    binBuffer.writeShort(backattrs.length());
-                    binBuffer.writeBytes(backattrs.getBytes(StandardCharsets.UTF_8));
-                } else {
-                    binBuffer.writeShort(0x0);
-                }
-
-                binBuffer.writeShort(0xee01);
+                String uniqVal = commonAttrMap.get(AttributeConstants.UNIQ_ID);
+                ByteBuf binBuffer = MessageUtils.getResponsePackage(backattrs, msgType, uniqVal);
                 if (remoteChannel.isWritable()) {
                     remoteChannel.writeAndFlush(binBuffer);
                     logger.debug("Connection info: {} ; attr is {} ; uniqVal {}",
@@ -647,16 +639,23 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
 
                 updateMsgList(msgList, commonAttrMap, messageMap, strRemoteIP, msgType);
 
-                formatMessagesAndSend(commonAttrMap, messageMap, strRemoteIP, msgType);
+                formatMessagesAndSend(ctx, commonAttrMap, messageMap,
+                        strRemoteIP, msgType);
 
             } else if (msgList != null && commonAttrMap.containsKey(ConfigConstants.FILE_CHECK_DATA)) {
                 Map<String, String> headers = new HashMap<String, String>();
                 headers.put("msgtype", "filestatus");
                 headers.put(ConfigConstants.FILE_CHECK_DATA,
                         "true");
+                headers.put(AttributeConstants.UNIQ_ID,
+                        commonAttrMap.get(AttributeConstants.UNIQ_ID));
                 for (ProxyMessage message : msgList) {
                     byte[] body = message.getData();
                     Event event = EventBuilder.withBody(body, headers);
+                    if (MessageUtils.isSyncSendForOrder(commonAttrMap
+                            .get(AttributeConstants.MESSAGE_SYNC_SEND))) {
+                        event = new OrderEvent(ctx, event);
+                    }
                     try {
                         processor.processEvent(event);
                         this.addMetric(true, body.length, event);
@@ -674,9 +673,15 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
                 headers.put("msgtype", "measure");
                 headers.put(ConfigConstants.FILE_CHECK_DATA,
                         "true");
+                headers.put(AttributeConstants.UNIQ_ID,
+                        commonAttrMap.get(AttributeConstants.UNIQ_ID));
                 for (ProxyMessage message : msgList) {
                     byte[] body = message.getData();
                     Event event = EventBuilder.withBody(body, headers);
+                    if (MessageUtils.isSyncSendForOrder(commonAttrMap
+                            .get(AttributeConstants.MESSAGE_SYNC_SEND))) {
+                        event = new OrderEvent(ctx, event);
+                    }
                     try {
                         processor.processEvent(event);
                         this.addMetric(true, body.length, event);
@@ -689,8 +694,11 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
                 }
             }
             SocketAddress remoteSocketAddress = remoteChannel.remoteAddress();
-            responsePackage(ctx, commonAttrMap, resultMap, remoteChannel,
-                    remoteSocketAddress, msgType);
+            if (!MessageUtils.isSyncSendForOrder(commonAttrMap
+                    .get(AttributeConstants.MESSAGE_SYNC_SEND))) {
+                responsePackage(ctx, commonAttrMap, resultMap, remoteChannel,
+                        remoteSocketAddress, msgType);
+            }
         } finally {
             cb.release();
         }
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/MessageUtils.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/MessageUtils.java
new file mode 100644
index 0000000..02b83c8
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/MessageUtils.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.  You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.inlong.dataproxy.utils;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import java.nio.charset.StandardCharsets;
+import org.apache.commons.lang.StringUtils;
+import org.apache.flume.Event;
+import org.apache.inlong.dataproxy.consts.AttributeConstants;
+import org.apache.inlong.dataproxy.source.MsgType;
+
+public class MessageUtils {
+
+    /**
+     *  is or not sync send for order message
+     * @param syncSend syncSend
+     * @return true/false
+     */
+    public static boolean isSyncSendForOrder(String syncSend) {
+        if (StringUtils.isNotEmpty(syncSend) && "true".equals(syncSend)) {
+            return true;
+        }
+        return false;
+    }
+
+    /**
+     *  is or not sync send for order message
+     * @param event event
+     * @return true/false
+     */
+    public static boolean isSyncSendForOrder(Event event) {
+        String syncSend = event.getHeaders().get(AttributeConstants.MESSAGE_SYNC_SEND);
+        return isSyncSendForOrder(syncSend);
+    }
+
+    public static ByteBuf getResponsePackage(String backattrs, MsgType msgType, String sequenceId) {
+        int binTotalLen = 1 + 4 + 2 + 2;
+        if (null != backattrs) {
+            binTotalLen += backattrs.length();
+        }
+        ByteBuf binBuffer = ByteBufAllocator.DEFAULT.buffer(4 + binTotalLen);
+        binBuffer.writeInt(binTotalLen);
+        binBuffer.writeByte(msgType.getValue());
+
+        long uniqVal = Long.parseLong(sequenceId);
+        byte[] uniq = new byte[4];
+        uniq[0] = (byte) ((uniqVal >> 24) & 0xFF);
+        uniq[1] = (byte) ((uniqVal >> 16) & 0xFF);
+        uniq[2] = (byte) ((uniqVal >> 8) & 0xFF);
+        uniq[3] = (byte) (uniqVal & 0xFF);
+        binBuffer.writeBytes(uniq);
+
+        if (null != backattrs) {
+            binBuffer.writeShort(backattrs.length());
+            binBuffer.writeBytes(backattrs.getBytes(StandardCharsets.UTF_8));
+        } else {
+            binBuffer.writeShort(0x0);
+        }
+        binBuffer.writeShort(0xee01);
+        return binBuffer;
+    }
+
+}
diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolDecoder.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolDecoder.java
index db86efd..579b07b 100644
--- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolDecoder.java
+++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolDecoder.java
@@ -32,7 +32,7 @@ public class ProtocolDecoder extends MessageToMessageDecoder<ByteBuf> {
     private static final Logger logger = LoggerFactory.getLogger(ProtocolDecoder.class);
 
     @Override
-    protected void decode(ChannelHandlerContext var1,
+    protected void decode(ChannelHandlerContext ctx,
             ByteBuf buffer, List<Object> out) throws Exception {
         buffer.markReaderIndex();
         // totallen
diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolEncoder.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolEncoder.java
index 3cfa2f9..b9b68a0 100644
--- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolEncoder.java
+++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolEncoder.java
@@ -46,7 +46,7 @@ public class ProtocolEncoder extends MessageToMessageEncoder<EncodeObject> {
     private static final Logger logger = LoggerFactory
             .getLogger(ProtocolEncoder.class);
 
-    protected void encode(ChannelHandlerContext var1,
+    protected void encode(ChannelHandlerContext ctx,
             EncodeObject message, List<Object> out) throws Exception {
         ByteBuf buf = null;
         try {