You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2022/10/31 05:12:30 UTC

[inlong] branch master updated: [INLONG-6324][DataProxy] Optimize DataProxy's message processing on the source side (#6325)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 6749c733d [INLONG-6324][DataProxy] Optimize DataProxy's message processing on the source side (#6325)
6749c733d is described below

commit 6749c733d7a1e22a7fc40b8fa2f775b0a8268fe0
Author: Goson Zhang <46...@qq.com>
AuthorDate: Mon Oct 31 13:12:25 2022 +0800

    [INLONG-6324][DataProxy] Optimize DataProxy's message processing on the source side (#6325)
---
 .../inlong/common/enums/DataProxyErrCode.java      |  63 +++
 .../dataproxy/consts/AttributeConstants.java       |   4 +
 .../inlong/dataproxy/consts/ConfigConstants.java   |   2 +
 .../dataproxy/sink/pulsar/PulsarClientService.java |   7 +-
 .../dataproxy/source/ServerMessageHandler.java     | 452 +++++++++------------
 .../inlong/dataproxy/utils/MessageUtils.java       | 199 ++++++++-
 6 files changed, 441 insertions(+), 286 deletions(-)

diff --git a/inlong-common/src/main/java/org/apache/inlong/common/enums/DataProxyErrCode.java b/inlong-common/src/main/java/org/apache/inlong/common/enums/DataProxyErrCode.java
new file mode 100644
index 000000000..4a08319f1
--- /dev/null
+++ b/inlong-common/src/main/java/org/apache/inlong/common/enums/DataProxyErrCode.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.common.enums;
+
+public enum DataProxyErrCode {
+
+    SUCCESS(0, "Ok"),
+
+    UNSUPPORTED_MSGTYPE(1, "Unsupported msgType"),
+    EMPTY_MSG(2, "Empty message"),
+    UNSUPPORTED_EXTENDFIELD_VALUE(3,
+            "Unsupported extend field value"),
+    UNCONFIGURED_GROUPID_OR_STREAMID(4,
+            "Un-configured groupId or streamId"),
+
+    UNKNOWN_ERROR(Integer.MAX_VALUE, "Unknown error");
+
+    private final int errCode;
+    private final String errMsg;
+
+    DataProxyErrCode(int errorCode, String errorMsg) {
+        this.errCode = errorCode;
+        this.errMsg = errorMsg;
+    }
+
+    public static DataProxyErrCode valueOf(int value) {
+        for (DataProxyErrCode msgErrCode : DataProxyErrCode.values()) {
+            if (msgErrCode.getErrCode() == value) {
+                return msgErrCode;
+            }
+        }
+
+        return UNKNOWN_ERROR;
+    }
+
+    public int getErrCode() {
+        return errCode;
+    }
+
+    public String getErrCodeStr() {
+        return String.valueOf(errCode);
+    }
+
+    public String getErrMsg() {
+        return errMsg;
+    }
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/AttributeConstants.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/AttributeConstants.java
index 8cb56c426..45d502750 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/AttributeConstants.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/AttributeConstants.java
@@ -90,4 +90,8 @@ public class AttributeConstants {
 
     public static final String MESSAGE_IS_ACK = "isAck";
 
+    public static final String MESSAGE_PROCESS_ERRCODE = "errCode";
+
+    public static final String MESSAGE_PROCESS_ERRMSG = "errMsg";
+
 }
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java
index 1ec268016..1ede782ea 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java
@@ -100,6 +100,8 @@ public class ConfigConstants {
     public static final String PROXY_CLUSTER_TAG = "proxy.cluster.tag";
     public static final String PROXY_CLUSTER_INCHARGES = "proxy.cluster.inCharges";
     public static final String CONFIG_CHECK_INTERVAL = "configCheckInterval";
+    public static final String SOURCE_NO_TOPIC_ACCEPT = "source.topic.notfound.accept";
+    public static final String SINK_NO_TOPIC_RESEND = "sink.topic.notfound.resend";
 
     public static final String DECODER_BODY = "body";
     public static final String DECODER_TOPICKEY = "topic_key";
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
index dc0a87b07..c7b0058c7 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
@@ -17,6 +17,8 @@
 
 package org.apache.inlong.dataproxy.sink.pulsar;
 
+import static org.apache.inlong.common.util.NetworkUtils.getLocalIp;
+
 import com.google.common.base.Preconditions;
 import java.util.ArrayList;
 import java.util.Iterator;
@@ -36,7 +38,6 @@ import org.apache.inlong.dataproxy.config.pojo.MQClusterConfig;
 import org.apache.inlong.dataproxy.consts.AttributeConstants;
 import org.apache.inlong.dataproxy.consts.ConfigConstants;
 import org.apache.inlong.dataproxy.sink.EventStat;
-import org.apache.inlong.dataproxy.source.MsgType;
 import org.apache.inlong.dataproxy.utils.MessageUtils;
 import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.ClientBuilder;
@@ -243,7 +244,9 @@ public class PulsarClientService {
                     logger.debug("order message rsp: seqId = {}, inlongGroupId = {}, inlongStreamId = {}", sequenceId,
                             inlongGroupId, inlongStreamId);
                 }
-                ByteBuf binBuffer = MessageUtils.getResponsePackage("", MsgType.MSG_BIN_MULTI_BODY, sequenceId);
+                String attrs = ConfigConstants.DATAPROXY_IP_KEY
+                        + AttributeConstants.KEY_VALUE_SEPARATOR + getLocalIp();
+                ByteBuf binBuffer = MessageUtils.buildBinMsgRspPackage(attrs, sequenceId);
                 orderEvent.getCtx().writeAndFlush(binBuffer);
             });
         }
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
index a381f8c1c..3f96e1bf8 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.dataproxy.source;
 
+import static org.apache.inlong.common.util.NetworkUtils.getLocalIp;
 import static org.apache.inlong.dataproxy.consts.AttributeConstants.SEPARATOR;
 import static org.apache.inlong.dataproxy.source.SimpleTcpSource.blacklist;
 
@@ -30,9 +31,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicReference;
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
@@ -45,7 +44,7 @@ import org.apache.flume.event.EventBuilder;
 import org.apache.inlong.common.monitor.MonitorIndex;
 import org.apache.inlong.common.monitor.MonitorIndexExt;
 import org.apache.inlong.common.msg.InLongMsg;
-import org.apache.inlong.common.util.NetworkUtils;
+import org.apache.inlong.common.enums.DataProxyErrCode;
 import org.apache.inlong.dataproxy.base.OrderEvent;
 import org.apache.inlong.dataproxy.base.ProxyMessage;
 import org.apache.inlong.dataproxy.config.ConfigManager;
@@ -247,97 +246,193 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
         allChannels.remove(ctx.channel());
     }
 
-    private void checkGroupIdInfo(ProxyMessage message, Map<String, String> commonAttrMap,
-                                  Map<String, String> attrMap, AtomicReference<String> topicInfo) {
-        String groupId = message.getGroupId();
-        String streamId = message.getStreamId();
-        if (null != groupId) {
-            // get configured group Id
-            String from = commonAttrMap.get(AttributeConstants.FROM);
-            if ("dc".equals(from)) {
-                String dcInterfaceId = message.getStreamId();
-                if (StringUtils.isNotEmpty(dcInterfaceId)
-                        && configManager.getDcMappingProperties()
-                        .containsKey(dcInterfaceId.trim())) {
-                    groupId = configManager.getDcMappingProperties()
-                            .get(dcInterfaceId.trim()).trim();
-                    message.setGroupId(groupId);
+    @Override
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+        if (msg == null) {
+            logger.debug("Get null msg, just skip!");
+            return;
+        }
+        ByteBuf cb = (ByteBuf) msg;
+        try {
+            Channel remoteChannel = ctx.channel();
+            String strRemoteIP = getRemoteIp(remoteChannel);
+            int len = cb.readableBytes();
+            if (len == 0 && this.filterEmptyMsg) {
+                logger.debug("Get empty msg from {}, just skip!", strRemoteIP);
+                return;
+            }
+            // parse message
+            Map<String, Object> resultMap = null;
+            final long msgRcvTime = System.currentTimeMillis();
+            try {
+                resultMap = serviceDecoder.extractData(cb,
+                        strRemoteIP, msgRcvTime, remoteChannel);
+                if (resultMap == null || resultMap.isEmpty()) {
+                    logger.debug("Parse message result is null, from {}", strRemoteIP);
+                    return;
                 }
+            } catch (MessageIDException ex) {
+                logger.error("MessageIDException ex = {}", ex);
+                throw new IOException(ex.getCause());
             }
-            // get configured topic name
-            String configTopic = MessageUtils.getTopic(
-                    configManager.getTopicProperties(), groupId, streamId);
-            if (StringUtils.isNotEmpty(configTopic)) {
-                topicInfo.set(configTopic.trim());
+            // get msgType from parsed result
+            MsgType msgType = (MsgType) resultMap.get(ConfigConstants.MSG_TYPE);
+            // get attribute data from parsed result
+            Map<String, String> commonAttrMap =
+                    (Map<String, String>) resultMap.get(ConfigConstants.COMMON_ATTR_MAP);
+            if (commonAttrMap == null) {
+                commonAttrMap = new HashMap<>();
             }
-            // get configured m value
-            Map<String, String> mxValue =
-                    configManager.getMxPropertiesMaps().get(groupId);
-            if (mxValue != null && mxValue.size() != 0) {
-                message.getAttributeMap().putAll(mxValue);
-            } else {
-                message.getAttributeMap().putAll(mapSplitter.split(this.defaultMXAttr));
+            // process heartbeat message
+            if (MsgType.MSG_HEARTBEAT.equals(msgType)
+                    || MsgType.MSG_BIN_HEARTBEAT.equals(msgType)) {
+                MessageUtils.sourceReturnRspPackage(
+                        commonAttrMap, resultMap, remoteChannel, msgType);
+                return;
             }
-        } else {
-            String num2name = commonAttrMap.get(AttributeConstants.NUM2NAME);
-            String groupIdNum = commonAttrMap.get(AttributeConstants.GROUPID_NUM);
-            String streamIdNum = commonAttrMap.get(AttributeConstants.STREAMID_NUM);
-            // get configured groupId and steamId by numbers
-            if (configManager.getGroupIdMappingProperties() != null
-                    && configManager.getStreamIdMappingProperties() != null) {
-                groupId = configManager.getGroupIdMappingProperties().get(groupIdNum);
-                streamId = (configManager.getStreamIdMappingProperties().get(groupIdNum) == null)
-                        ? null : configManager.getStreamIdMappingProperties().get(groupIdNum).get(streamIdNum);
-                if (groupId != null && streamId != null) {
-                    String enableTrans =
-                            (configManager.getGroupIdEnableMappingProperties() == null)
-                                    ? null : configManager.getGroupIdEnableMappingProperties().get(groupIdNum);
-                    if (("TRUE".equalsIgnoreCase(enableTrans)
-                            && "TRUE".equalsIgnoreCase(num2name))) {
-                        String extraAttr = "groupId=" + groupId + "&" + "streamId=" + streamId;
-                        message.setData(newBinMsg(message.getData(), extraAttr));
-                    }
-                    // reset groupId and streamId to message and attrMap
-                    attrMap.put(AttributeConstants.GROUP_ID, groupId);
-                    attrMap.put(AttributeConstants.STREAM_ID, streamId);
-                    message.setGroupId(groupId);
-                    message.setStreamId(streamId);
-                    // get configured topic name
-                    String configTopic = MessageUtils.getTopic(
-                            configManager.getTopicProperties(), groupId, streamId);
-                    if (StringUtils.isNotEmpty(configTopic)) {
-                        topicInfo.set(configTopic.trim());
-                    }
-                }
+            // reject unsupported messages
+            if (commonAttrMap.containsKey(ConfigConstants.FILE_CHECK_DATA)
+                    || commonAttrMap.containsKey(ConfigConstants.MINUTE_CHECK_DATA)) {
+                commonAttrMap.put(AttributeConstants.MESSAGE_PROCESS_ERRCODE,
+                        DataProxyErrCode.UNSUPPORTED_EXTENDFIELD_VALUE.getErrCodeStr());
+                MessageUtils.sourceReturnRspPackage(
+                        commonAttrMap, resultMap, remoteChannel, msgType);
+                return;
+            }
+            // check message's groupId, streamId, topic
+            List<ProxyMessage> msgList =
+                    (List<ProxyMessage>) resultMap.get(ConfigConstants.MSG_LIST);
+            if (msgList == null) {
+                commonAttrMap.put(AttributeConstants.MESSAGE_PROCESS_ERRCODE,
+                        DataProxyErrCode.EMPTY_MSG.getErrCodeStr());
+                MessageUtils.sourceReturnRspPackage(
+                        commonAttrMap, resultMap, remoteChannel, msgType);
+                return;
+            }
+            // transfer message data
+            Map<String, HashMap<String, List<ProxyMessage>>> messageMap =
+                    new HashMap<>(msgList.size());
+            if (!convertMsgList(msgList, commonAttrMap, messageMap, strRemoteIP)) {
+                MessageUtils.sourceReturnRspPackage(
+                        commonAttrMap, resultMap, remoteChannel, msgType);
+                return;
             }
+            // send messages to channel
+            formatMessagesAndSend(ctx, commonAttrMap,
+                    messageMap, strRemoteIP, msgType, msgRcvTime);
+            // return response
+            if (!MessageUtils.isSyncSendForOrder(
+                    commonAttrMap.get(AttributeConstants.MESSAGE_SYNC_SEND))) {
+                MessageUtils.sourceReturnRspPackage(
+                        commonAttrMap, resultMap, remoteChannel, msgType);
+            }
+        } finally {
+            cb.release();
         }
     }
 
-    private boolean updateMsgList(List<ProxyMessage> msgList, Map<String, String> commonAttrMap,
-                                  Map<String, HashMap<String, List<ProxyMessage>>> messageMap,
-                                  String strRemoteIP) {
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+        logger.error("exception caught cause = {}", cause);
+        monitorIndexExt.incrementAndGet("EVENT_OTHEREXP");
+        ctx.close();
+    }
+
+    /**
+     * Complete the message content and covert proxy message to map
+     *
+     * @param msgList  the message list
+     * @param commonAttrMap common attribute map
+     * @param messageMap    message list
+     * @param strRemoteIP   remote ip
+     *
+     * @return  convert result
+     */
+    private boolean convertMsgList(List<ProxyMessage> msgList, Map<String, String> commonAttrMap,
+                                   Map<String, HashMap<String, List<ProxyMessage>>> messageMap,
+                                   String strRemoteIP) {
         for (ProxyMessage message : msgList) {
-            String topic = this.defaultTopic;
-            Map<String, String> attrMap = message.getAttributeMap();
-            AtomicReference<String> topicInfo = new AtomicReference<>(topic);
-            checkGroupIdInfo(message, commonAttrMap, attrMap, topicInfo);
+            String configTopic = null;
             String groupId = message.getGroupId();
             String streamId = message.getStreamId();
+            // get topic by groupId and streamId
+            if (null == groupId) {
+                String num2name = commonAttrMap.get(AttributeConstants.NUM2NAME);
+                String groupIdNum = commonAttrMap.get(AttributeConstants.GROUPID_NUM);
+                String streamIdNum = commonAttrMap.get(AttributeConstants.STREAMID_NUM);
+                // get configured groupId and steamId by numbers
+                if (configManager.getGroupIdMappingProperties() != null
+                        && configManager.getStreamIdMappingProperties() != null) {
+                    groupId = configManager.getGroupIdMappingProperties().get(groupIdNum);
+                    streamId = (configManager.getStreamIdMappingProperties().get(groupIdNum) == null)
+                            ? null : configManager.getStreamIdMappingProperties().get(groupIdNum).get(streamIdNum);
+                    if (groupId != null && streamId != null) {
+                        String enableTrans =
+                                (configManager.getGroupIdEnableMappingProperties() == null)
+                                        ? null : configManager.getGroupIdEnableMappingProperties().get(groupIdNum);
+                        if (("TRUE".equalsIgnoreCase(enableTrans)
+                                && "TRUE".equalsIgnoreCase(num2name))) {
+                            String extraAttr = "groupId=" + groupId + "&" + "streamId=" + streamId;
+                            message.setData(newBinMsg(message.getData(), extraAttr));
+                        }
+                        // reset groupId and streamId to message and attrMap
+                        message.setGroupId(groupId);
+                        message.setStreamId(streamId);
+                        // get configured topic name
+                        configTopic = MessageUtils.getTopic(
+                                configManager.getTopicProperties(), groupId, streamId);
+                    }
+                }
+            } else {
+                // get configured group Id
+                String from = commonAttrMap.get(AttributeConstants.FROM);
+                if ("dc".equals(from)) {
+                    String dcInterfaceId = message.getStreamId();
+                    if (StringUtils.isNotEmpty(dcInterfaceId)
+                            && configManager.getDcMappingProperties()
+                            .containsKey(dcInterfaceId.trim())) {
+                        groupId = configManager.getDcMappingProperties()
+                                .get(dcInterfaceId.trim()).trim();
+                        message.setGroupId(groupId);
+                    }
+                }
+                // get configured m value
+                Map<String, String> mxValue =
+                        configManager.getMxPropertiesMaps().get(groupId);
+                if (mxValue != null && mxValue.size() != 0) {
+                    message.getAttributeMap().putAll(mxValue);
+                } else {
+                    message.getAttributeMap().putAll(mapSplitter.split(this.defaultMXAttr));
+                }
+                // get configured topic name
+                configTopic = MessageUtils.getTopic(
+                        configManager.getTopicProperties(), groupId, streamId);
+            }
+            // check topic configure
+            if (StringUtils.isEmpty(configTopic)) {
+                String acceptMsg =
+                        configManager.getCommonProperties().getOrDefault(
+                                ConfigConstants.SOURCE_NO_TOPIC_ACCEPT, "false");
+                if ("true".equalsIgnoreCase(acceptMsg)) {
+                    configTopic = this.defaultTopic;
+                } else {
+                    commonAttrMap.put(AttributeConstants.MESSAGE_PROCESS_ERRCODE,
+                            DataProxyErrCode.UNCONFIGURED_GROUPID_OR_STREAMID.getErrCodeStr());
+                    logger.debug("Topic for message is null , inlongGroupId = {}, inlongStreamId = {}",
+                            groupId, streamId);
+                    return false;
+                }
+            }
             if (streamId == null) {
                 streamId = "";
                 message.setStreamId(streamId);
             }
-            topic = topicInfo.get();
-            if (StringUtils.isEmpty(topic)) {
-                logger.warn("Topic for message is null , inlongGroupId = {}, inlongStreamId = {}",
-                        groupId, streamId);
-            }
             // append topic
-            message.setTopic(topic);
+            message.setTopic(configTopic);
             commonAttrMap.put(AttributeConstants.NODE_IP, strRemoteIP);
             // add ProxyMessage
             HashMap<String, List<ProxyMessage>> streamIdMsgMap = messageMap
-                    .computeIfAbsent(topic, k -> new HashMap<>());
+                    .computeIfAbsent(configTopic, k -> new HashMap<>());
             List<ProxyMessage> streamIdMsgList = streamIdMsgMap
                     .computeIfAbsent(streamId, k -> new ArrayList<>());
             streamIdMsgList.add(message);
@@ -345,6 +440,16 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
         return true;
     }
 
+    /**
+     * format message to event and send to channel
+     *
+     * @param ctx  client connect
+     * @param commonAttrMap common attribute map
+     * @param messageMap    message list
+     * @param strRemoteIP   remote ip
+     * @param msgType    the message type
+     * @param msgRcvTime  the received time
+     */
     private void formatMessagesAndSend(ChannelHandlerContext ctx, Map<String, String> commonAttrMap,
                                        Map<String, HashMap<String, List<ProxyMessage>>> messageMap,
                                        String strRemoteIP, MsgType msgType, long msgRcvTime) throws MessageIDException {
@@ -439,7 +544,7 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
                         .append(topicEntry.getKey()).append(SEPARATOR)
                         .append(streamIdEntry.getKey()).append(SEPARATOR)
                         .append(strRemoteIP).append(SEPARATOR)
-                        .append(NetworkUtils.getLocalIp()).append(SEPARATOR)
+                        .append(getLocalIp()).append(SEPARATOR)
                         .append(orderType).append(SEPARATOR)
                         .append(DateTimeUtils.ms2yyyyMMddHHmm(longDataTime)).append(SEPARATOR)
                         .append(DateTimeUtils.ms2yyyyMMddHHmm(msgRcvTime));
@@ -451,7 +556,7 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
                             recordMsgCnt, 1, data.length, 0);
                     strBuff.delete(0, strBuff.length());
                 } catch (Throwable ex) {
-                    logger.error("Error writting to channel,data will discard.", ex);
+                    logger.error("Error writting to channel, data will discard.", ex);
                     monitorIndexExt.incrementAndGet("EVENT_DROPPED");
                     monitorIndex.addAndGet(strBuff.toString(), 0, 0, 0, recordMsgCnt);
                     this.addStatistics(false, data.length, event);
@@ -462,199 +567,6 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
         }
     }
 
-    private void responsePackage(Map<String, String> commonAttrMap,
-                                 Map<String, Object> resultMap,
-                                 Channel remoteChannel,
-                                 MsgType msgType) throws Exception {
-        String isAck = commonAttrMap.get(AttributeConstants.MESSAGE_IS_ACK);
-        if (isAck == null || "true".equals(isAck)) {
-            if (MsgType.MSG_ACK_SERVICE.equals(msgType) || MsgType.MSG_ORIGINAL_RETURN
-                    .equals(msgType)
-                    || MsgType.MSG_MULTI_BODY.equals(msgType) || MsgType.MSG_MULTI_BODY_ATTR
-                    .equals(msgType)) {
-                byte[] backAttr = mapJoiner.join(commonAttrMap).getBytes(StandardCharsets.UTF_8);
-                byte[] backBody = null;
-
-                if (backAttr != null && !new String(backAttr, StandardCharsets.UTF_8).isEmpty()) {
-                    if (MsgType.MSG_ORIGINAL_RETURN.equals(msgType)) {
-
-                        backBody = (byte[]) resultMap.get(ConfigConstants.DECODER_BODY);
-                    } else {
-
-                        backBody = new byte[]{50};
-                    }
-                    int backTotalLen = 1 + 4 + backBody.length + 4 + backAttr.length;
-                    ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(4 + backTotalLen);
-                    buffer.writeInt(backTotalLen);
-                    buffer.writeByte(msgType.getValue());
-                    buffer.writeInt(backBody.length);
-                    buffer.writeBytes(backBody);
-                    buffer.writeInt(backAttr.length);
-                    buffer.writeBytes(backAttr);
-                    if (remoteChannel.isWritable()) {
-                        remoteChannel.writeAndFlush(buffer);
-                    } else {
-                        String backAttrStr = new String(backAttr, StandardCharsets.UTF_8);
-                        logger.warn(
-                                "the send buffer1 is full, so disconnect it!please check remote client"
-                                        + "; Connection info:"
-                                        + remoteChannel + ";attr is " + backAttrStr);
-                        buffer.release();
-                        throw new Exception(new Throwable(
-                                "the send buffer1 is full, so disconnect it!please check remote client"
-                                        +
-                                        "; Connection info:" + remoteChannel + ";attr is "
-                                        + backAttrStr));
-                    }
-                }
-            } else if (MsgType.MSG_BIN_MULTI_BODY.equals(msgType)) {
-                String backAttrs = (String) resultMap.get(ConfigConstants.DECODER_ATTRS);
-                String uniqVal = commonAttrMap.get(AttributeConstants.UNIQ_ID);
-                ByteBuf binBuffer = MessageUtils.getResponsePackage(backAttrs, msgType, uniqVal);
-                if (remoteChannel.isWritable()) {
-                    remoteChannel.writeAndFlush(binBuffer);
-                    logger.debug("Connection info: {} ; attr is {} ; uniqVal {}",
-                            remoteChannel, backAttrs, uniqVal);
-                } else {
-                    binBuffer.release();
-                    logger.warn(
-                            "the send buffer2 is full, so disconnect it!please check remote client"
-                                    + "; Connection info:" + remoteChannel + ";attr is "
-                                    + backAttrs);
-                    throw new Exception(new Throwable(
-                            "the send buffer2 is full,so disconnect it!please check remote client, Connection info:"
-                                    + remoteChannel + ";attr is " + backAttrs));
-                }
-            }
-        }
-    }
-
-    @Override
-    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
-        if (msg == null) {
-            logger.error("Get null msg, just skip!");
-            return;
-        }
-        ByteBuf cb = (ByteBuf) msg;
-        try {
-            Channel remoteChannel = ctx.channel();
-            String strRemoteIP = getRemoteIp(remoteChannel);
-            int len = cb.readableBytes();
-            if (len == 0 && this.filterEmptyMsg) {
-                logger.warn("Get empty msg from {}, just skip!", strRemoteIP);
-                return;
-            }
-            // parse message
-            Map<String, Object> resultMap = null;
-            final long msgRcvTime = System.currentTimeMillis();
-            try {
-                resultMap = serviceDecoder.extractData(cb,
-                        strRemoteIP, msgRcvTime, remoteChannel);
-                if (resultMap == null || resultMap.isEmpty()) {
-                    logger.info("Parse message result is null, from {}", strRemoteIP);
-                    return;
-                }
-            } catch (MessageIDException ex) {
-                logger.error("MessageIDException ex = {}", ex);
-                throw new IOException(ex.getCause());
-            }
-            // process message by msgType
-            MsgType msgType = (MsgType) resultMap.get(ConfigConstants.MSG_TYPE);
-            if (MsgType.MSG_HEARTBEAT.equals(msgType)) {
-                ByteBuf heartbeatBuffer = ByteBufAllocator.DEFAULT.buffer(5);
-                heartbeatBuffer.writeBytes(new byte[]{0, 0, 0, 1, 1});
-                remoteChannel.writeAndFlush(heartbeatBuffer);
-                return;
-            }
-            // process heart beat 8
-            if (MsgType.MSG_BIN_HEARTBEAT.equals(msgType)) {
-                return;
-            }
-            // process data message
-            Map<String, String> commonAttrMap =
-                    (Map<String, String>) resultMap.get(ConfigConstants.COMMON_ATTR_MAP);
-            if (commonAttrMap == null) {
-                commonAttrMap = new HashMap<String, String>();
-            }
-            List<ProxyMessage> msgList = (List<ProxyMessage>) resultMap.get(ConfigConstants.MSG_LIST);
-            boolean checkMessageTopic = true;
-            if (msgList != null) {
-                if (commonAttrMap.containsKey(ConfigConstants.FILE_CHECK_DATA)) {
-                    // process file check data
-                    Map<String, String> headers = new HashMap<String, String>();
-                    headers.put("msgtype", "filestatus");
-                    headers.put(ConfigConstants.FILE_CHECK_DATA, "true");
-                    headers.put(AttributeConstants.UNIQ_ID,
-                            commonAttrMap.get(AttributeConstants.UNIQ_ID));
-                    for (ProxyMessage message : msgList) {
-                        byte[] body = message.getData();
-                        Event event = EventBuilder.withBody(body, headers);
-                        if (MessageUtils.isSyncSendForOrder(commonAttrMap
-                                .get(AttributeConstants.MESSAGE_SYNC_SEND))) {
-                            event = new OrderEvent(ctx, event);
-                        }
-                        try {
-                            processor.processEvent(event);
-                            this.addStatistics(true, body.length, event);
-                        } catch (Throwable ex) {
-                            logger.error("Error writing to controller,data will discard.", ex);
-                            this.addStatistics(false, body.length, event);
-                            throw new ChannelException(
-                                    "Process Controller Event error can't write event to channel.");
-                        }
-                    }
-                } else if (commonAttrMap.containsKey(ConfigConstants.MINUTE_CHECK_DATA)) {
-                    // process minute check data
-                    Map<String, String> headers = new HashMap<String, String>();
-                    headers.put("msgtype", "measure");
-                    headers.put(ConfigConstants.FILE_CHECK_DATA, "true");
-                    headers.put(AttributeConstants.UNIQ_ID,
-                            commonAttrMap.get(AttributeConstants.UNIQ_ID));
-                    for (ProxyMessage message : msgList) {
-                        byte[] body = message.getData();
-                        Event event = EventBuilder.withBody(body, headers);
-                        if (MessageUtils.isSyncSendForOrder(commonAttrMap
-                                .get(AttributeConstants.MESSAGE_SYNC_SEND))) {
-                            event = new OrderEvent(ctx, event);
-                        }
-                        try {
-                            processor.processEvent(event);
-                            this.addStatistics(true, body.length, event);
-                        } catch (Throwable ex) {
-                            logger.error("Error writing to controller,data will discard.", ex);
-                            this.addStatistics(false, body.length, event);
-                            throw new ChannelException(
-                                    "Process Controller Event error can't write event to channel.");
-                        }
-                    }
-                } else {
-                    // process message data
-                    Map<String, HashMap<String, List<ProxyMessage>>> messageMap =
-                            new HashMap<>(msgList.size());
-                    checkMessageTopic = updateMsgList(msgList,
-                            commonAttrMap, messageMap, strRemoteIP);
-                    if (checkMessageTopic) {
-                        formatMessagesAndSend(ctx, commonAttrMap,
-                                messageMap, strRemoteIP, msgType, msgRcvTime);
-                    }
-                }
-            }
-            if (!checkMessageTopic || !MessageUtils.isSyncSendForOrder(commonAttrMap
-                    .get(AttributeConstants.MESSAGE_SYNC_SEND))) {
-                responsePackage(commonAttrMap, resultMap, remoteChannel, msgType);
-            }
-        } finally {
-            cb.release();
-        }
-    }
-
-    @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
-        logger.error("exception caught cause = {}", cause);
-        monitorIndexExt.incrementAndGet("EVENT_OTHEREXP");
-        ctx.close();
-    }
-
     /**
      * add statistics information
      *
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/MessageUtils.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/MessageUtils.java
index d8835658d..1d268415a 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/MessageUtils.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/MessageUtils.java
@@ -15,13 +15,17 @@
 
 package org.apache.inlong.dataproxy.utils;
 
+import static org.apache.inlong.common.util.NetworkUtils.getLocalIp;
+
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.Channel;
 import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flume.Event;
+import org.apache.inlong.common.monitor.LogCounter;
 import org.apache.inlong.common.util.NetworkUtils;
 import org.apache.inlong.dataproxy.consts.AttributeConstants;
 import org.apache.inlong.dataproxy.consts.ConfigConstants;
@@ -30,6 +34,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class MessageUtils {
+    // log print count
+    private static final LogCounter logCounter =
+            new LogCounter(10, 100000, 30 * 1000);
 
     private static final Logger logger = LoggerFactory.getLogger(MessageUtils.class);
 
@@ -39,7 +46,7 @@ public class MessageUtils {
      * @return true/false
      */
     public static boolean isSyncSendForOrder(String syncSend) {
-        if (StringUtils.isNotEmpty(syncSend) && "true".equals(syncSend)) {
+        if (StringUtils.isNotEmpty(syncSend) && "true".equalsIgnoreCase(syncSend)) {
             return true;
         }
         return false;
@@ -56,22 +63,150 @@ public class MessageUtils {
     }
 
     /**
-     * Convert String to ByteBuf
+     *  Return response to client in source
+     * @param commonAttrMap attribute map
+     * @param resultMap     result map
+     * @param remoteChannel client channel
+     * @param msgType       the message type
+     */
+    public static void sourceReturnRspPackage(Map<String, String> commonAttrMap,
+                                              Map<String, Object> resultMap,
+                                              Channel remoteChannel,
+                                              MsgType msgType) throws Exception {
+        ByteBuf binBuffer;
+        String origAttrs = null;
+        final StringBuilder strBuff = new StringBuilder(512);
+        // build message bytes
+        if (MsgType.MSG_UNKNOWN.equals(msgType)) {
+            if (logCounter.shouldPrint()) {
+                logger.warn("Unknown msgType message from {}, discard it!", remoteChannel);
+            }
+            return;
+        }
+        if (MsgType.MSG_HEARTBEAT.equals(msgType)) {
+            binBuffer = buildHeartBeatMsgRspPackage();
+        } else {
+            // check whether return response message
+            String isAck = commonAttrMap.get(AttributeConstants.MESSAGE_IS_ACK);
+            if ("false".equalsIgnoreCase(isAck)) {
+                return;
+            }
+            origAttrs = (String) resultMap.get(ConfigConstants.DECODER_ATTRS);
+            // check whether channel is writable
+            if (!remoteChannel.isWritable()) {
+                strBuff.append("Send buffer is full1, so disconnect ")
+                        .append(remoteChannel).append(", attr is ").append(origAttrs);
+                if (logCounter.shouldPrint()) {
+                    logger.warn(strBuff.toString());
+                }
+                throw new Exception(strBuff.toString());
+            }
+            // build return attribute string
+            String errCode = commonAttrMap.get(AttributeConstants.MESSAGE_PROCESS_ERRCODE);
+            String errMsg = commonAttrMap.get(AttributeConstants.MESSAGE_PROCESS_ERRMSG);
+            strBuff.append(ConfigConstants.DATAPROXY_IP_KEY)
+                    .append(AttributeConstants.KEY_VALUE_SEPARATOR).append(getLocalIp());
+            if (StringUtils.isNotEmpty(errCode)) {
+                strBuff.append(AttributeConstants.SEPARATOR)
+                        .append(AttributeConstants.MESSAGE_PROCESS_ERRCODE)
+                        .append(AttributeConstants.KEY_VALUE_SEPARATOR).append(errCode);
+            }
+            if (StringUtils.isNotEmpty(errMsg)) {
+                strBuff.append(AttributeConstants.SEPARATOR)
+                        .append(AttributeConstants.MESSAGE_PROCESS_ERRMSG)
+                        .append(AttributeConstants.KEY_VALUE_SEPARATOR).append(errMsg);
+            }
+            if (StringUtils.isNotEmpty(origAttrs)) {
+                strBuff.append(AttributeConstants.SEPARATOR).append(origAttrs);
+            }
+            String destAttrs = strBuff.toString();
+            // build response message bytes
+            if (MsgType.MSG_BIN_MULTI_BODY.equals(msgType)) {
+                binBuffer = buildBinMsgRspPackage(destAttrs,
+                        commonAttrMap.get(AttributeConstants.UNIQ_ID));
+            } else if (MsgType.MSG_BIN_HEARTBEAT.equals(msgType)) {
+                binBuffer = buildHBRspPackage(destAttrs,
+                        (Byte)resultMap.get(ConfigConstants.VERSION_TYPE), 0);
+            } else {
+                // MsgType.MSG_ACK_SERVICE.equals(msgType)
+                // MsgType.MSG_ORIGINAL_RETURN.equals(msgType)
+                // MsgType.MSG_MULTI_BODY.equals(msgType)
+                // MsgType.MSG_MULTI_BODY_ATTR.equals(msgType)
+                binBuffer = buildDefMsgRspPackage(msgType, destAttrs);
+            }
+        }
+        // send response message
+        if (remoteChannel.isWritable()) {
+            remoteChannel.writeAndFlush(binBuffer);
+        } else {
+            // release allocated ByteBuf
+            binBuffer.release();
+            strBuff.delete(0, strBuff.length());
+            strBuff.append("Send buffer is full2, so disconnect ")
+                    .append(remoteChannel).append(", attr is ").append(origAttrs);
+            if (logCounter.shouldPrint()) {
+                logger.warn(strBuff.toString());
+            }
+            throw new Exception(strBuff.toString());
+        }
+    }
+
+    /**
+     * Build hearbeat(1)-msg response message ByteBuf
+     *
+     * @return ByteBuf
+     */
+    private static ByteBuf buildHeartBeatMsgRspPackage() {
+        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(5);
+        // magic data
+        buffer.writeBytes(new byte[]{0, 0, 0, 1, 1});
+        return buffer;
+    }
+
+    /**
+     * Build default-msg response message ByteBuf
      *
-     * @param backattrs
-     * @param msgType message type
+     * @param msgType  the message type
+     * @param attrs    the return attribute
+     * @return ByteBuf
+     */
+    private static ByteBuf buildDefMsgRspPackage(MsgType msgType, String attrs) {
+        int attrsLen = 0;
+        int bodyLen = 0;
+        if (attrs != null) {
+            attrsLen = attrs.length();
+        }
+        // backTotalLen = mstType + bodyLen + body + attrsLen + attrs
+        int backTotalLen = 1 + 4 + bodyLen + 4 + attrsLen;
+        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(4 + backTotalLen);
+        buffer.writeInt(backTotalLen);
+        buffer.writeByte(msgType.getValue());
+        buffer.writeInt(bodyLen);
+        buffer.writeInt(attrsLen);
+        if (attrsLen > 0) {
+            buffer.writeBytes(attrs.getBytes(StandardCharsets.UTF_8));
+        }
+        return buffer;
+    }
+
+    /**
+     * Build bin-msg response message ByteBuf
+     *
+     * @param attrs   the return attribute
      * @param sequenceId sequence Id
      * @return ByteBuf
      */
-    public static ByteBuf getResponsePackage(String backattrs, MsgType msgType, String sequenceId) {
+    public static ByteBuf buildBinMsgRspPackage(String attrs, String sequenceId) {
+        // calculate total length
+        // binTotalLen = mstType + uniq + attrsLen + attrs + magic
         int binTotalLen = 1 + 4 + 2 + 2;
-        if (null != backattrs) {
-            binTotalLen += backattrs.length();
+        if (null != attrs) {
+            binTotalLen += attrs.length();
         }
+        // allocate buffer and write fields
         ByteBuf binBuffer = ByteBufAllocator.DEFAULT.buffer(4 + binTotalLen);
         binBuffer.writeInt(binTotalLen);
-        binBuffer.writeByte(msgType.getValue());
-
+        binBuffer.writeByte(MsgType.MSG_BIN_MULTI_BODY.getValue());
         long uniqVal = Long.parseLong(sequenceId);
         byte[] uniq = new byte[4];
         uniq[0] = (byte) ((uniqVal >> 24) & 0xFF);
@@ -79,10 +214,46 @@ public class MessageUtils {
         uniq[2] = (byte) ((uniqVal >> 8) & 0xFF);
         uniq[3] = (byte) (uniqVal & 0xFF);
         binBuffer.writeBytes(uniq);
+        if (null != attrs) {
+            binBuffer.writeShort(attrs.length());
+            binBuffer.writeBytes(attrs.getBytes(StandardCharsets.UTF_8));
+        } else {
+            binBuffer.writeShort(0x0);
+        }
+        binBuffer.writeShort(0xee01);
+        return binBuffer;
+    }
 
-        if (null != backattrs) {
-            binBuffer.writeShort(backattrs.length());
-            binBuffer.writeBytes(backattrs.getBytes(StandardCharsets.UTF_8));
+    /**
+     * Build heartbeat response message ByteBuf
+     *
+     * @param attrs     the attribute string
+     * @param version   the version
+     * @param loadValue the node load value
+     * @return ByteBuf
+     */
+    private static ByteBuf buildHBRspPackage(String attrs, byte version, int loadValue) {
+        // calculate total length
+        // binTotalLen = mstType + dataTime + version + bodyLen + body + attrsLen + attrs + magic
+        int binTotalLen = 1 + 4 + 1 + 4 + 2 + 2 + 2;
+        if (null != attrs) {
+            binTotalLen += attrs.length();
+        }
+        // check load value
+        if (loadValue == 0 || loadValue == (-1)) {
+            loadValue = 0xffff;
+        }
+        // allocate buffer and write fields
+        ByteBuf binBuffer = ByteBufAllocator.DEFAULT.buffer(4 + binTotalLen);
+        binBuffer.writeInt(binTotalLen);
+        binBuffer.writeByte(MsgType.MSG_BIN_HEARTBEAT.getValue());
+        binBuffer.writeInt((int) (System.currentTimeMillis() / 1000));
+        binBuffer.writeByte(version);
+        binBuffer.writeInt(2);
+        binBuffer.writeShort(loadValue);
+        if (null != attrs) {
+            binBuffer.writeShort(attrs.length());
+            binBuffer.writeBytes(attrs.getBytes(StandardCharsets.UTF_8));
         } else {
             binBuffer.writeShort(0x0);
         }
@@ -104,8 +275,8 @@ public class MessageUtils {
             }
         }
         if (logger.isDebugEnabled()) {
-            logger.debug("Get topic by groupId = {}, streamId = {}, topic = {}", groupId, streamId,
-                    topic);
+            logger.debug("Get topic by groupId = {}, streamId = {}, topic = {}",
+                    groupId, streamId, topic);
         }
         return topic;
     }