You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/11/08 14:04:43 UTC

[inlong] 01/07: [INLONG-6417][SDK] Support proxy-send mode (#6437)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.4
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit ce8f194f72d67d7218bf7734bf832d9df3e093f5
Author: xueyingzhang <86...@users.noreply.github.com>
AuthorDate: Tue Nov 8 15:47:47 2022 +0800

    [INLONG-6417][SDK] Support proxy-send mode (#6437)
---
 .../inlong/sdk/dataproxy/DefaultMessageSender.java | 376 +++++++++++++++++----
 .../inlong/sdk/dataproxy/utils/ProxyUtils.java     |   3 +-
 2 files changed, 306 insertions(+), 73 deletions(-)

diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
index 2dc1c2ca5..4cba1a3ea 100644
--- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
+++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
@@ -18,6 +18,7 @@
 
 package org.apache.inlong.sdk.dataproxy;
 
+import org.apache.inlong.common.msg.AttributeConstants;
 import org.apache.inlong.sdk.dataproxy.codec.EncodeObject;
 import org.apache.inlong.sdk.dataproxy.config.ProxyConfigEntry;
 import org.apache.inlong.sdk.dataproxy.config.ProxyConfigManager;
@@ -95,12 +96,12 @@ public class DefaultMessageSender implements MessageSender {
     /**
      * generate by cluster id
      *
-     * @param configure         - sender
+     * @param configure - sender
      * @param selfDefineFactory - sender factory
      * @return - sender
      */
     public static DefaultMessageSender generateSenderByClusterId(ProxyClientConfig configure,
-                                                                 ThreadFactory selfDefineFactory) throws Exception {
+            ThreadFactory selfDefineFactory) throws Exception {
         ProxyConfigManager proxyConfigManager = new ProxyConfigManager(configure,
                 Utils.getLocalIp(), null);
         proxyConfigManager.setGroupId(configure.getGroupId());
@@ -188,35 +189,61 @@ public class DefaultMessageSender implements MessageSender {
 
     @Deprecated
     public SendResult sendMessage(byte[] body, String attributes, String msgUUID,
-                                  long timeout, TimeUnit timeUnit) {
+            long timeout, TimeUnit timeUnit) {
         return sender.syncSendMessage(new EncodeObject(body, attributes,
                 idGenerator.getNextId()), msgUUID, timeout, timeUnit);
     }
 
     public SendResult sendMessage(byte[] body, String groupId, String streamId, long dt, String msgUUID,
-                                  long timeout, TimeUnit timeUnit) {
+            long timeout, TimeUnit timeUnit) {
+        return sendMessage(body, groupId, streamId, dt, msgUUID, timeout, timeUnit, false);
+    }
+
+    /**
+     * ync send single message
+     *
+     * @param body message data
+     * @param groupId groupId
+     * @param streamId streamId
+     * @param dt data report timestamp
+     * @param msgUUID msg uuid
+     * @param timeout
+     * @param timeUnit
+     * @param isProxySend true: dataproxy doesn't return response message until data is sent to MQ
+     * @return SendResult.OK means success
+     */
+    public SendResult sendMessage(byte[] body, String groupId, String streamId, long dt, String msgUUID,
+            long timeout, TimeUnit timeUnit, boolean isProxySend) {
         dt = ProxyUtils.covertZeroDt(dt);
         if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt)) {
             return SendResult.INVALID_ATTRIBUTES;
         }
         addIndexCnt(groupId, streamId, 1);
 
+        String proxySend = "";
+        if (isProxySend) {
+            proxySend = AttributeConstants.MESSAGE_PROXY_SEND + "=true";
+        }
+
         boolean isCompressEnd = (isCompress && (body.length > cpsSize));
 
         if (msgtype == 7 || msgtype == 8) {
             EncodeObject encodeObject = new EncodeObject(body, msgtype, isCompressEnd, isReport,
-                    isGroupIdTransfer, dt / 1000, idGenerator.getNextInt(), groupId, streamId, "");
+                    isGroupIdTransfer, dt / 1000, idGenerator.getNextInt(), groupId, streamId, proxySend);
             encodeObject.setSupportLF(isSupportLF);
             return sender.syncSendMessage(encodeObject, msgUUID, timeout, timeUnit);
         } else if (msgtype == 3 || msgtype == 5) {
+            if (isProxySend) {
+                proxySend = "&" + proxySend;
+            }
             if (isCompressEnd) {
                 return sender.syncSendMessage(new EncodeObject(body, "groupId=" + groupId + "&streamId="
-                        + streamId + "&dt=" + dt + "&cp=snappy", idGenerator.getNextId(), this.getMsgtype(),
-                        true, groupId), msgUUID, timeout, timeUnit);
+                        + streamId + "&dt=" + dt + "&cp=snappy" + proxySend, idGenerator.getNextId(),
+                        this.getMsgtype(), true, groupId), msgUUID, timeout, timeUnit);
             } else {
-                return sender.syncSendMessage(new EncodeObject(body, "groupId=" + groupId + "&streamId="
-                                + streamId + "&dt=" + dt, idGenerator.getNextId(), this.getMsgtype(), false, groupId),
-                        msgUUID, timeout, timeUnit);
+                return sender.syncSendMessage(new EncodeObject(body,
+                        "groupId=" + groupId + "&streamId=" + streamId + "&dt=" + dt + proxySend,
+                        idGenerator.getNextId(), this.getMsgtype(), false, groupId), msgUUID, timeout, timeUnit);
             }
         }
 
@@ -224,7 +251,26 @@ public class DefaultMessageSender implements MessageSender {
     }
 
     public SendResult sendMessage(byte[] body, String groupId, String streamId, long dt, String msgUUID,
-                                  long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap) {
+            long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap) {
+        return sendMessage(body, groupId, streamId, dt, msgUUID, timeout, timeUnit, extraAttrMap, false);
+    }
+
+    /**
+     * sync send single message
+     *
+     * @param body message data
+     * @param groupId groupId
+     * @param streamId streamId
+     * @param dt data report timestamp
+     * @param msgUUID msg uuid
+     * @param timeout
+     * @param timeUnit
+     * @param extraAttrMap extra attributes
+     * @param isProxySend true: dataproxy doesn't return response message until data is sent to MQ
+     * @return SendResult.OK means success
+     */
+    public SendResult sendMessage(byte[] body, String groupId, String streamId, long dt, String msgUUID,
+            long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap, boolean isProxySend) {
 
         dt = ProxyUtils.covertZeroDt(dt);
         if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt) || !ProxyUtils.isAttrKeysValid(extraAttrMap)) {
@@ -232,6 +278,9 @@ public class DefaultMessageSender implements MessageSender {
         }
         addIndexCnt(groupId, streamId, 1);
 
+        if (isProxySend) {
+            extraAttrMap.put(AttributeConstants.MESSAGE_PROXY_SEND, "true");
+        }
         StringBuilder attrs = ProxyUtils.convertAttrToStr(extraAttrMap);
 
         boolean isCompressEnd = (isCompress && (body.length > cpsSize));
@@ -260,27 +309,54 @@ public class DefaultMessageSender implements MessageSender {
     }
 
     public SendResult sendMessage(List<byte[]> bodyList, String groupId, String streamId, long dt, String msgUUID,
-                                  long timeout, TimeUnit timeUnit) {
+            long timeout, TimeUnit timeUnit) {
+        return sendMessage(bodyList, groupId, streamId, dt, msgUUID, timeout, timeUnit, false);
+    }
+
+    /**
+     * sync send a batch of messages
+     *
+     * @param bodyList list of messages
+     * @param groupId groupId
+     * @param streamId streamId
+     * @param dt data report timestamp
+     * @param msgUUID msg uuid
+     * @param timeout
+     * @param timeUnit
+     * @param isProxySend true: dataproxy doesn't return response message until data is sent to MQ
+     * @return SendResult.OK means success
+     */
+    public SendResult sendMessage(List<byte[]> bodyList, String groupId, String streamId, long dt, String msgUUID,
+            long timeout, TimeUnit timeUnit, boolean isProxySend) {
         dt = ProxyUtils.covertZeroDt(dt);
         if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt)) {
             return SendResult.INVALID_ATTRIBUTES;
         }
         addIndexCnt(groupId, streamId, bodyList.size());
 
+        String proxySend = "";
+        if (isProxySend) {
+            proxySend = AttributeConstants.MESSAGE_SYNC_SEND + "=true";
+        }
+
         if (msgtype == 7 || msgtype == 8) {
             EncodeObject encodeObject = new EncodeObject(bodyList, msgtype, isCompress, isReport,
                     isGroupIdTransfer, dt / 1000,
-                    idGenerator.getNextInt(), groupId, streamId, "");
+                    idGenerator.getNextInt(), groupId, streamId, proxySend);
             encodeObject.setSupportLF(isSupportLF);
             return sender.syncSendMessage(encodeObject, msgUUID, timeout, timeUnit);
         } else if (msgtype == 3 || msgtype == 5) {
+            if (isProxySend) {
+                proxySend = "&" + proxySend;
+            }
             if (isCompress) {
                 return sender.syncSendMessage(new EncodeObject(bodyList, "groupId=" + groupId + "&streamId=" + streamId
-                        + "&dt=" + dt + "&cp=snappy" + "&cnt=" + bodyList.size(), idGenerator.getNextId(),
-                        this.getMsgtype(), true, groupId), msgUUID, timeout, timeUnit);
+                        + "&dt=" + dt + "&cp=snappy" + "&cnt=" + bodyList.size() + proxySend,
+                        idGenerator.getNextId(), this.getMsgtype(), true, groupId), msgUUID, timeout, timeUnit);
             } else {
                 return sender.syncSendMessage(new EncodeObject(bodyList, "groupId=" + groupId + "&streamId=" + streamId
-                        + "&dt=" + dt + "&cnt=" + bodyList.size(), idGenerator.getNextId(), this.getMsgtype(),
+                        + "&dt=" + dt + "&cnt=" + bodyList.size() + proxySend, idGenerator.getNextId(),
+                        this.getMsgtype(),
                         false, groupId), msgUUID, timeout, timeUnit);
             }
         }
@@ -288,13 +364,35 @@ public class DefaultMessageSender implements MessageSender {
     }
 
     public SendResult sendMessage(List<byte[]> bodyList, String groupId, String streamId, long dt,
-                                  String msgUUID, long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap) {
+            String msgUUID, long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap) {
+        return sendMessage(bodyList, groupId, streamId, dt, msgUUID, timeout, timeUnit, extraAttrMap, false);
+    }
+
+    /**
+     * sync send a batch of messages
+     *
+     * @param bodyList list of messages
+     * @param groupId groupId
+     * @param streamId streamId
+     * @param dt data report timestamp
+     * @param msgUUID msg uuid
+     * @param timeout
+     * @param timeUnit
+     * @param extraAttrMap extra attributes
+     * @param isProxySend true: dataproxy doesn't return response message until data is sent to MQ
+     * @return SendResult.OK means success
+     */
+    public SendResult sendMessage(List<byte[]> bodyList, String groupId, String streamId, long dt,
+            String msgUUID, long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap, boolean isProxySend) {
         dt = ProxyUtils.covertZeroDt(dt);
-        if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt)
-                || !ProxyUtils.isAttrKeysValid(extraAttrMap)) {
+        if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt) || !ProxyUtils.isAttrKeysValid(
+                extraAttrMap)) {
             return SendResult.INVALID_ATTRIBUTES;
         }
         addIndexCnt(groupId, streamId, bodyList.size());
+        if (isProxySend) {
+            extraAttrMap.put(AttributeConstants.MESSAGE_PROXY_SEND, "true");
+        }
         StringBuilder attrs = ProxyUtils.convertAttrToStr(extraAttrMap);
 
         if (msgtype == 7 || msgtype == 8) {
@@ -322,37 +420,62 @@ public class DefaultMessageSender implements MessageSender {
 
     @Deprecated
     public void asyncSendMessage(SendMessageCallback callback, byte[] body, String attributes,
-                                 String msgUUID, long timeout, TimeUnit timeUnit) throws ProxysdkException {
+            String msgUUID, long timeout, TimeUnit timeUnit) throws ProxysdkException {
         sender.asyncSendMessage(new EncodeObject(body, attributes, idGenerator.getNextId()),
                 callback, msgUUID, timeout, timeUnit);
     }
 
-    public void asyncSendMessage(SendMessageCallback callback, byte[] body,
-                                 String groupId, String streamId, long dt, String msgUUID,
-                                 long timeout, TimeUnit timeUnit) throws ProxysdkException {
+    public void asyncSendMessage(SendMessageCallback callback, byte[] body, String groupId, String streamId, long dt,
+            String msgUUID, long timeout, TimeUnit timeUnit) throws ProxysdkException {
+        asyncSendMessage(callback, body, groupId, streamId, dt, msgUUID, timeout, timeUnit, false);
+    }
+
+    /**
+     * async send single message
+     *
+     * @param callback callback can be null
+     * @param body message data
+     * @param groupId groupId
+     * @param streamId streamId
+     * @param dt data report timestamp
+     * @param msgUUID msg uuid
+     * @param timeout
+     * @param timeUnit
+     * @param isProxySend true: dataproxy doesn't return response message until data is sent to MQ
+     * @throws ProxysdkException
+     */
+    public void asyncSendMessage(SendMessageCallback callback, byte[] body, String groupId, String streamId, long dt,
+            String msgUUID, long timeout, TimeUnit timeUnit, boolean isProxySend) throws ProxysdkException {
         dt = ProxyUtils.covertZeroDt(dt);
         if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt)) {
             throw new ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
         }
         addIndexCnt(groupId, streamId, 1);
 
+        String proxySend = "";
+        if (isProxySend) {
+            proxySend = AttributeConstants.MESSAGE_PROXY_SEND + "=true";
+        }
         boolean isCompressEnd = (isCompress && (body.length > cpsSize));
         if (msgtype == 7 || msgtype == 8) {
             EncodeObject encodeObject = new EncodeObject(body, this.getMsgtype(), isCompressEnd, isReport,
                     isGroupIdTransfer, dt / 1000, idGenerator.getNextInt(),
-                    groupId, streamId, "");
+                    groupId, streamId, proxySend);
             encodeObject.setSupportLF(isSupportLF);
             sender.asyncSendMessage(encodeObject, callback, msgUUID, timeout, timeUnit);
         } else if (msgtype == 3 || msgtype == 5) {
             if (isCompressEnd) {
+                if (isProxySend) {
+                    proxySend = "&" + proxySend;
+                }
                 sender.asyncSendMessage(new EncodeObject(body, "groupId="
-                                + groupId + "&streamId=" + streamId + "&dt=" + dt + "&cp=snappy",
+                                + groupId + "&streamId=" + streamId + "&dt=" + dt + "&cp=snappy" + proxySend,
                                 idGenerator.getNextId(), this.getMsgtype(), true, groupId),
                         callback, msgUUID, timeout, timeUnit);
             } else {
                 sender.asyncSendMessage(
                         new EncodeObject(body, "groupId=" + groupId + "&streamId="
-                                + streamId + "&dt=" + dt, idGenerator.getNextId(),
+                                + streamId + "&dt=" + dt + proxySend, idGenerator.getNextId(),
                                 this.getMsgtype(), false, groupId), callback,
                         msgUUID, timeout, timeUnit);
             }
@@ -360,14 +483,38 @@ public class DefaultMessageSender implements MessageSender {
 
     }
 
-    public void asyncSendMessage(SendMessageCallback callback, byte[] body, String groupId, String streamId,
-                                 long dt, String msgUUID, long timeout, TimeUnit timeUnit,
-                                 Map<String, String> extraAttrMap) throws ProxysdkException {
+    public void asyncSendMessage(SendMessageCallback callback, byte[] body, String groupId, String streamId, long dt,
+            String msgUUID, long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap)
+            throws ProxysdkException {
+        asyncSendMessage(callback, body, groupId, streamId, dt, msgUUID, timeout, timeUnit, extraAttrMap, false);
+    }
+
+    /**
+     * async send single message
+     *
+     * @param callback callback can be null
+     * @param body message data
+     * @param groupId groupId
+     * @param streamId streamId
+     * @param dt data report timestamp
+     * @param msgUUID msg uuid
+     * @param timeout
+     * @param timeUnit
+     * @param extraAttrMap extra attributes
+     * @param isProxySend true: dataproxy doesn't return response message until data is sent to MQ
+     * @throws ProxysdkException
+     */
+    public void asyncSendMessage(SendMessageCallback callback, byte[] body, String groupId, String streamId, long dt,
+            String msgUUID, long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap, boolean isProxySend)
+            throws ProxysdkException {
         dt = ProxyUtils.covertZeroDt(dt);
         if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt) || !ProxyUtils.isAttrKeysValid(extraAttrMap)) {
             throw new ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
         }
         addIndexCnt(groupId, streamId, 1);
+        if (isProxySend) {
+            extraAttrMap.put(AttributeConstants.MESSAGE_PROXY_SEND, "true");
+        }
         StringBuilder attrs = ProxyUtils.convertAttrToStr(extraAttrMap);
 
         boolean isCompressEnd = (isCompress && (body.length > cpsSize));
@@ -392,45 +539,97 @@ public class DefaultMessageSender implements MessageSender {
         }
     }
 
+    public void asyncSendMessage(SendMessageCallback callback, List<byte[]> bodyList, String groupId, String streamId,
+            long dt, String msgUUID, long timeout, TimeUnit timeUnit) throws ProxysdkException {
+        asyncSendMessage(callback, bodyList, groupId, streamId, dt, msgUUID, timeout, timeUnit, false);
+    }
+
+    /**
+     * async send a batch of messages
+     *
+     * @param callback callback can be null
+     * @param bodyList list of messages
+     * @param groupId groupId
+     * @param streamId streamId
+     * @param dt data report time
+     * @param msgUUID msg uuid
+     * @param timeout
+     * @param timeUnit
+     * @param isProxySend true: dataproxy doesn't return response message until data is sent to MQ
+     * @throws ProxysdkException
+     */
     public void asyncSendMessage(SendMessageCallback callback, List<byte[]> bodyList,
-                                 String groupId, String streamId, long dt, String msgUUID,
-                                 long timeout, TimeUnit timeUnit) throws ProxysdkException {
+            String groupId, String streamId, long dt, String msgUUID,
+            long timeout, TimeUnit timeUnit, boolean isProxySend) throws ProxysdkException {
         dt = ProxyUtils.covertZeroDt(dt);
         if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt)) {
             throw new ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
         }
         addIndexCnt(groupId, streamId, bodyList.size());
+        String proxySend = "";
+        if (isProxySend) {
+            proxySend = AttributeConstants.MESSAGE_PROXY_SEND + "=true";
+        }
         if (msgtype == 7 || msgtype == 8) {
             EncodeObject encodeObject = new EncodeObject(bodyList, this.getMsgtype(), isCompress,
                     isReport, isGroupIdTransfer, dt / 1000, idGenerator.getNextInt(),
-                    groupId, streamId, "");
+                    groupId, streamId, proxySend);
             encodeObject.setSupportLF(isSupportLF);
             sender.asyncSendMessage(encodeObject, callback, msgUUID, timeout, timeUnit);
         } else if (msgtype == 3 || msgtype == 5) {
+            if (isProxySend) {
+                proxySend = "&" + proxySend;
+            }
             if (isCompress) {
                 sender.asyncSendMessage(
                         new EncodeObject(bodyList, "groupId=" + groupId + "&streamId=" + streamId
-                                + "&dt=" + dt + "&cp=snappy" + "&cnt=" + bodyList.size(), idGenerator.getNextId(),
+                                + "&dt=" + dt + "&cp=snappy" + "&cnt=" + bodyList.size() + proxySend,
+                                idGenerator.getNextId(),
                                 this.getMsgtype(), true, groupId), callback, msgUUID, timeout, timeUnit);
             } else {
                 sender.asyncSendMessage(
-                        new EncodeObject(bodyList, "groupId=" + groupId + "&streamId=" + streamId + "&dt=" + dt
-                                + "&cnt=" + bodyList.size(), idGenerator.getNextId(), this.getMsgtype(),
-                                false, groupId), callback, msgUUID, timeout, timeUnit);
+                        new EncodeObject(bodyList,
+                                "groupId=" + groupId + "&streamId=" + streamId + "&dt=" + dt + "&cnt=" + bodyList.size()
+                                        + proxySend, idGenerator.getNextId(), this.getMsgtype(), false, groupId),
+                        callback, msgUUID, timeout, timeUnit);
             }
         }
     }
 
     public void asyncSendMessage(SendMessageCallback callback,
-                                 List<byte[]> bodyList, String groupId, String streamId, long dt, String msgUUID,
-                                 long timeout, TimeUnit timeUnit,
-                                 Map<String, String> extraAttrMap) throws ProxysdkException {
+            List<byte[]> bodyList, String groupId, String streamId, long dt, String msgUUID,
+            long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap) throws ProxysdkException {
+        asyncSendMessage(callback, bodyList, groupId, streamId, dt, msgUUID, timeout, timeUnit, extraAttrMap, false);
+    }
+
+    /**
+     * async send a batch of messages
+     *
+     * @param callback callback can be null
+     * @param bodyList list of messages
+     * @param groupId groupId
+     * @param streamId streamId
+     * @param dt data report time
+     * @param msgUUID msg uuid
+     * @param timeout
+     * @param timeUnit
+     * @param extraAttrMap extra attributes
+     * @param isProxySend true: dataproxy doesn't return response message until data is sent to MQ
+     * @throws ProxysdkException
+     */
+    public void asyncSendMessage(SendMessageCallback callback,
+            List<byte[]> bodyList, String groupId, String streamId, long dt, String msgUUID,
+            long timeout, TimeUnit timeUnit,
+            Map<String, String> extraAttrMap, boolean isProxySend) throws ProxysdkException {
         dt = ProxyUtils.covertZeroDt(dt);
-        if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt)
-                || !ProxyUtils.isAttrKeysValid(extraAttrMap)) {
+        if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt) || !ProxyUtils.isAttrKeysValid(
+                extraAttrMap)) {
             throw new ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
         }
         addIndexCnt(groupId, streamId, bodyList.size());
+        if (isProxySend) {
+            extraAttrMap.put(AttributeConstants.MESSAGE_PROXY_SEND, "true");
+        }
         StringBuilder attrs = ProxyUtils.convertAttrToStr(extraAttrMap);
 
         if (msgtype == 7 || msgtype == 8) {
@@ -465,28 +664,60 @@ public class DefaultMessageSender implements MessageSender {
      * @throws ProxysdkException
      */
     @Override
-    public void asyncSendMessage(String inlongGroupId, String inlongStreamId, byte[] body,
-                                 SendMessageCallback callback) throws ProxysdkException {
+    public void asyncSendMessage(String inlongGroupId, String inlongStreamId, byte[] body, SendMessageCallback callback)
+            throws ProxysdkException {
         this.asyncSendMessage(callback, body, inlongGroupId, inlongStreamId, System.currentTimeMillis(),
                 idGenerator.getNextId(), DEFAULT_SEND_TIMEOUT, DEFAULT_SEND_TIMEUNIT);
     }
 
     /**
-     * asyncSendMessage
+     * async send single message
      *
-     * @param inlongGroupId
-     * @param inlongStreamId
-     * @param bodyList
-     * @param callback
+     * @param inlongGroupId groupId
+     * @param inlongStreamId streamId
+     * @param body a single message
+     * @param callback callback can be null
+     * @param isProxySend true: dataproxy doesn't return response message until data is sent to MQ
+     * @throws ProxysdkException
+     */
+    public void asyncSendMessage(String inlongGroupId, String inlongStreamId, byte[] body, SendMessageCallback callback,
+            boolean isProxySend) throws ProxysdkException {
+        this.asyncSendMessage(callback, body, inlongGroupId, inlongStreamId, System.currentTimeMillis(),
+                idGenerator.getNextId(), DEFAULT_SEND_TIMEOUT, DEFAULT_SEND_TIMEUNIT, isProxySend);
+    }
+
+    /**
+     * async send a batch of messages
+     *
+     * @param inlongGroupId groupId
+     * @param inlongStreamId streamId
+     * @param bodyList list of messages
+     * @param callback callback can be null
      * @throws ProxysdkException
      */
     @Override
     public void asyncSendMessage(String inlongGroupId, String inlongStreamId, List<byte[]> bodyList,
-                                 SendMessageCallback callback) throws ProxysdkException {
+            SendMessageCallback callback) throws ProxysdkException {
         this.asyncSendMessage(callback, bodyList, inlongGroupId, inlongStreamId, System.currentTimeMillis(),
                 idGenerator.getNextId(), DEFAULT_SEND_TIMEOUT, DEFAULT_SEND_TIMEUNIT);
     }
 
+    /**
+     * async send a batch of messages
+     *
+     * @param inlongGroupId groupId
+     * @param inlongStreamId streamId
+     * @param bodyList list of messages
+     * @param callback callback can be null
+     * @param isProxySend true: dataproxy doesn't return response message until data is sent to MQ
+     * @throws ProxysdkException
+     */
+    public void asyncSendMessage(String inlongGroupId, String inlongStreamId, List<byte[]> bodyList,
+            SendMessageCallback callback, boolean isProxySend) throws ProxysdkException {
+        this.asyncSendMessage(callback, bodyList, inlongGroupId, inlongStreamId, System.currentTimeMillis(),
+                idGenerator.getNextId(), DEFAULT_SEND_TIMEOUT, DEFAULT_SEND_TIMEUNIT, isProxySend);
+    }
+
     private void addIndexCnt(String groupId, String streamId, long cnt) {
         try {
             String key = groupId + "|" + streamId;
@@ -501,10 +732,10 @@ public class DefaultMessageSender implements MessageSender {
         }
     }
 
-    public void asyncsendMessageData(FileCallback callback, List<byte[]> bodyList, String groupId,
-                                     String streamId, long dt, int sid, boolean isSupportLF, String msgUUID,
-                                     long timeout, TimeUnit timeUnit,
-                                     Map<String, String> extraAttrMap) throws ProxysdkException {
+    @Deprecated
+    public void asyncsendMessageData(FileCallback callback, List<byte[]> bodyList, String groupId, String streamId,
+            long dt, int sid, boolean isSupportLF, String msgUUID, long timeout, TimeUnit timeUnit,
+            Map<String, String> extraAttrMap) throws ProxysdkException {
         dt = ProxyUtils.covertZeroDt(dt);
         if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt)
                 || !ProxyUtils.isAttrKeysValid(extraAttrMap)) {
@@ -523,9 +754,9 @@ public class DefaultMessageSender implements MessageSender {
         }
     }
 
-    private void asyncSendMetric(FileCallback callback, byte[] body, String groupId,
-                                 String streamId, long dt, int sid, String ip, String msgUUID,
-                                 long timeout, TimeUnit timeUnit, String messageKey) throws ProxysdkException {
+    @Deprecated
+    private void asyncSendMetric(FileCallback callback, byte[] body, String groupId, String streamId, long dt, int sid,
+            String ip, String msgUUID, long timeout, TimeUnit timeUnit, String messageKey) throws ProxysdkException {
         dt = ProxyUtils.covertZeroDt(dt);
         if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt)) {
             throw new ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
@@ -538,23 +769,23 @@ public class DefaultMessageSender implements MessageSender {
         }
     }
 
-    public void asyncsendMessageProxy(FileCallback callback, byte[] body, String groupId, String streamId,
-                                      long dt, int sid, String ip, String msgUUID,
-                                      long timeout, TimeUnit timeUnit) throws ProxysdkException {
+    @Deprecated
+    public void asyncsendMessageProxy(FileCallback callback, byte[] body, String groupId, String streamId, long dt,
+            int sid, String ip, String msgUUID, long timeout, TimeUnit timeUnit) throws ProxysdkException {
         asyncSendMetric(callback, body, groupId, streamId, dt, sid, ip, msgUUID, timeout,
                 timeUnit, "minute");
     }
 
-    public void asyncsendMessageFile(FileCallback callback, byte[] body, String groupId,
-                                     String streamId, long dt, int sid, String msgUUID,
-                                     long timeout, TimeUnit timeUnit) throws ProxysdkException {
+    @Deprecated
+    public void asyncsendMessageFile(FileCallback callback, byte[] body, String groupId, String streamId, long dt,
+            int sid, String msgUUID, long timeout, TimeUnit timeUnit) throws ProxysdkException {
         asyncSendMetric(callback, body, groupId, streamId, dt, sid, "", msgUUID, timeout, timeUnit,
                 "file");
     }
 
-    public String sendMessageData(List<byte[]> bodyList, String groupId,
-                                  String streamId, long dt, int sid, boolean isSupportLF, String msgUUID,
-                                  long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap) {
+    @Deprecated
+    public String sendMessageData(List<byte[]> bodyList, String groupId, String streamId, long dt, int sid,
+            boolean isSupportLF, String msgUUID, long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap) {
         dt = ProxyUtils.covertZeroDt(dt);
         if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt)
                 || !ProxyUtils.isAttrKeysValid(extraAttrMap)) {
@@ -574,9 +805,9 @@ public class DefaultMessageSender implements MessageSender {
         return null;
     }
 
-    private String sendMetric(byte[] body, String groupId, String streamId, long dt, int sid, String ip,
-                              String msgUUID,
-                              long timeout, TimeUnit timeUnit, String messageKey) {
+    @Deprecated
+    private String sendMetric(byte[] body, String groupId, String streamId, long dt, int sid, String ip, String msgUUID,
+            long timeout, TimeUnit timeUnit, String messageKey) {
         dt = ProxyUtils.covertZeroDt(dt);
         if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt)) {
             return SendResult.INVALID_ATTRIBUTES.toString();
@@ -589,14 +820,15 @@ public class DefaultMessageSender implements MessageSender {
         return null;
     }
 
-    public String sendMessageProxy(byte[] body, String groupId, String streamId,
-                                   long dt, int sid, String ip, String msgUUID,
-                                   long timeout, TimeUnit timeUnit) {
+    @Deprecated
+    public String sendMessageProxy(byte[] body, String groupId, String streamId, long dt, int sid, String ip,
+            String msgUUID, long timeout, TimeUnit timeUnit) {
         return sendMetric(body, groupId, streamId, dt, sid, ip, msgUUID, timeout, timeUnit, "minute");
     }
 
+    @Deprecated
     public String sendMessageFile(byte[] body, String groupId, String streamId, long dt, int sid, String msgUUID,
-                                  long timeout, TimeUnit timeUnit) {
+            long timeout, TimeUnit timeUnit) {
         return sendMetric(body, groupId, streamId, dt, sid, "", msgUUID, timeout, timeUnit, "file");
     }
 
diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
index df1c4939a..fd6b0f275 100644
--- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
+++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
@@ -39,7 +39,8 @@ public class ProxyUtils {
     static {
         Collections.addAll(invalidAttr, "groupId", "streamId", "dt", "msgUUID", "cp",
                 "cnt", "mt", "m", "sid", "t", "NodeIP", "messageId", "_file_status_check", "_secretId",
-                "_signature", "_timeStamp", "_nonce", "_userName", "_clientIP", "_encyVersion", "_encyAesKey");
+                "_signature", "_timeStamp", "_nonce", "_userName", "_clientIP", "_encyVersion", "_encyAesKey",
+                "proxySend", "errMsg", "errCode");
     }
 
     public static boolean isAttrKeysValid(Map<String, String> attrsMap) {