You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/11/08 14:04:43 UTC
[inlong] 01/07: [INLONG-6417][SDK] Support proxy-send mode (#6437)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch branch-1.4
in repository https://gitbox.apache.org/repos/asf/inlong.git
commit ce8f194f72d67d7218bf7734bf832d9df3e093f5
Author: xueyingzhang <86...@users.noreply.github.com>
AuthorDate: Tue Nov 8 15:47:47 2022 +0800
[INLONG-6417][SDK] Support proxy-send mode (#6437)
---
.../inlong/sdk/dataproxy/DefaultMessageSender.java | 376 +++++++++++++++++----
.../inlong/sdk/dataproxy/utils/ProxyUtils.java | 3 +-
2 files changed, 306 insertions(+), 73 deletions(-)
diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
index 2dc1c2ca5..4cba1a3ea 100644
--- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
+++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
@@ -18,6 +18,7 @@
package org.apache.inlong.sdk.dataproxy;
+import org.apache.inlong.common.msg.AttributeConstants;
import org.apache.inlong.sdk.dataproxy.codec.EncodeObject;
import org.apache.inlong.sdk.dataproxy.config.ProxyConfigEntry;
import org.apache.inlong.sdk.dataproxy.config.ProxyConfigManager;
@@ -95,12 +96,12 @@ public class DefaultMessageSender implements MessageSender {
/**
* generate by cluster id
*
- * @param configure - sender
+ * @param configure - sender
* @param selfDefineFactory - sender factory
* @return - sender
*/
public static DefaultMessageSender generateSenderByClusterId(ProxyClientConfig configure,
- ThreadFactory selfDefineFactory) throws Exception {
+ ThreadFactory selfDefineFactory) throws Exception {
ProxyConfigManager proxyConfigManager = new ProxyConfigManager(configure,
Utils.getLocalIp(), null);
proxyConfigManager.setGroupId(configure.getGroupId());
@@ -188,35 +189,61 @@ public class DefaultMessageSender implements MessageSender {
@Deprecated
public SendResult sendMessage(byte[] body, String attributes, String msgUUID,
- long timeout, TimeUnit timeUnit) {
+ long timeout, TimeUnit timeUnit) {
return sender.syncSendMessage(new EncodeObject(body, attributes,
idGenerator.getNextId()), msgUUID, timeout, timeUnit);
}
public SendResult sendMessage(byte[] body, String groupId, String streamId, long dt, String msgUUID,
- long timeout, TimeUnit timeUnit) {
+ long timeout, TimeUnit timeUnit) {
+ return sendMessage(body, groupId, streamId, dt, msgUUID, timeout, timeUnit, false);
+ }
+
+ /**
+ * ync send single message
+ *
+ * @param body message data
+ * @param groupId groupId
+ * @param streamId streamId
+ * @param dt data report timestamp
+ * @param msgUUID msg uuid
+ * @param timeout
+ * @param timeUnit
+ * @param isProxySend true: dataproxy doesn't return response message until data is sent to MQ
+ * @return SendResult.OK means success
+ */
+ public SendResult sendMessage(byte[] body, String groupId, String streamId, long dt, String msgUUID,
+ long timeout, TimeUnit timeUnit, boolean isProxySend) {
dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt)) {
return SendResult.INVALID_ATTRIBUTES;
}
addIndexCnt(groupId, streamId, 1);
+ String proxySend = "";
+ if (isProxySend) {
+ proxySend = AttributeConstants.MESSAGE_PROXY_SEND + "=true";
+ }
+
boolean isCompressEnd = (isCompress && (body.length > cpsSize));
if (msgtype == 7 || msgtype == 8) {
EncodeObject encodeObject = new EncodeObject(body, msgtype, isCompressEnd, isReport,
- isGroupIdTransfer, dt / 1000, idGenerator.getNextInt(), groupId, streamId, "");
+ isGroupIdTransfer, dt / 1000, idGenerator.getNextInt(), groupId, streamId, proxySend);
encodeObject.setSupportLF(isSupportLF);
return sender.syncSendMessage(encodeObject, msgUUID, timeout, timeUnit);
} else if (msgtype == 3 || msgtype == 5) {
+ if (isProxySend) {
+ proxySend = "&" + proxySend;
+ }
if (isCompressEnd) {
return sender.syncSendMessage(new EncodeObject(body, "groupId=" + groupId + "&streamId="
- + streamId + "&dt=" + dt + "&cp=snappy", idGenerator.getNextId(), this.getMsgtype(),
- true, groupId), msgUUID, timeout, timeUnit);
+ + streamId + "&dt=" + dt + "&cp=snappy" + proxySend, idGenerator.getNextId(),
+ this.getMsgtype(), true, groupId), msgUUID, timeout, timeUnit);
} else {
- return sender.syncSendMessage(new EncodeObject(body, "groupId=" + groupId + "&streamId="
- + streamId + "&dt=" + dt, idGenerator.getNextId(), this.getMsgtype(), false, groupId),
- msgUUID, timeout, timeUnit);
+ return sender.syncSendMessage(new EncodeObject(body,
+ "groupId=" + groupId + "&streamId=" + streamId + "&dt=" + dt + proxySend,
+ idGenerator.getNextId(), this.getMsgtype(), false, groupId), msgUUID, timeout, timeUnit);
}
}
@@ -224,7 +251,26 @@ public class DefaultMessageSender implements MessageSender {
}
public SendResult sendMessage(byte[] body, String groupId, String streamId, long dt, String msgUUID,
- long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap) {
+ long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap) {
+ return sendMessage(body, groupId, streamId, dt, msgUUID, timeout, timeUnit, extraAttrMap, false);
+ }
+
+ /**
+ * sync send single message
+ *
+ * @param body message data
+ * @param groupId groupId
+ * @param streamId streamId
+ * @param dt data report timestamp
+ * @param msgUUID msg uuid
+ * @param timeout
+ * @param timeUnit
+ * @param extraAttrMap extra attributes
+ * @param isProxySend true: dataproxy doesn't return response message until data is sent to MQ
+ * @return SendResult.OK means success
+ */
+ public SendResult sendMessage(byte[] body, String groupId, String streamId, long dt, String msgUUID,
+ long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap, boolean isProxySend) {
dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt) || !ProxyUtils.isAttrKeysValid(extraAttrMap)) {
@@ -232,6 +278,9 @@ public class DefaultMessageSender implements MessageSender {
}
addIndexCnt(groupId, streamId, 1);
+ if (isProxySend) {
+ extraAttrMap.put(AttributeConstants.MESSAGE_PROXY_SEND, "true");
+ }
StringBuilder attrs = ProxyUtils.convertAttrToStr(extraAttrMap);
boolean isCompressEnd = (isCompress && (body.length > cpsSize));
@@ -260,27 +309,54 @@ public class DefaultMessageSender implements MessageSender {
}
public SendResult sendMessage(List<byte[]> bodyList, String groupId, String streamId, long dt, String msgUUID,
- long timeout, TimeUnit timeUnit) {
+ long timeout, TimeUnit timeUnit) {
+ return sendMessage(bodyList, groupId, streamId, dt, msgUUID, timeout, timeUnit, false);
+ }
+
+ /**
+ * sync send a batch of messages
+ *
+ * @param bodyList list of messages
+ * @param groupId groupId
+ * @param streamId streamId
+ * @param dt data report timestamp
+ * @param msgUUID msg uuid
+ * @param timeout
+ * @param timeUnit
+ * @param isProxySend true: dataproxy doesn't return response message until data is sent to MQ
+ * @return SendResult.OK means success
+ */
+ public SendResult sendMessage(List<byte[]> bodyList, String groupId, String streamId, long dt, String msgUUID,
+ long timeout, TimeUnit timeUnit, boolean isProxySend) {
dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt)) {
return SendResult.INVALID_ATTRIBUTES;
}
addIndexCnt(groupId, streamId, bodyList.size());
+ String proxySend = "";
+ if (isProxySend) {
+ proxySend = AttributeConstants.MESSAGE_SYNC_SEND + "=true";
+ }
+
if (msgtype == 7 || msgtype == 8) {
EncodeObject encodeObject = new EncodeObject(bodyList, msgtype, isCompress, isReport,
isGroupIdTransfer, dt / 1000,
- idGenerator.getNextInt(), groupId, streamId, "");
+ idGenerator.getNextInt(), groupId, streamId, proxySend);
encodeObject.setSupportLF(isSupportLF);
return sender.syncSendMessage(encodeObject, msgUUID, timeout, timeUnit);
} else if (msgtype == 3 || msgtype == 5) {
+ if (isProxySend) {
+ proxySend = "&" + proxySend;
+ }
if (isCompress) {
return sender.syncSendMessage(new EncodeObject(bodyList, "groupId=" + groupId + "&streamId=" + streamId
- + "&dt=" + dt + "&cp=snappy" + "&cnt=" + bodyList.size(), idGenerator.getNextId(),
- this.getMsgtype(), true, groupId), msgUUID, timeout, timeUnit);
+ + "&dt=" + dt + "&cp=snappy" + "&cnt=" + bodyList.size() + proxySend,
+ idGenerator.getNextId(), this.getMsgtype(), true, groupId), msgUUID, timeout, timeUnit);
} else {
return sender.syncSendMessage(new EncodeObject(bodyList, "groupId=" + groupId + "&streamId=" + streamId
- + "&dt=" + dt + "&cnt=" + bodyList.size(), idGenerator.getNextId(), this.getMsgtype(),
+ + "&dt=" + dt + "&cnt=" + bodyList.size() + proxySend, idGenerator.getNextId(),
+ this.getMsgtype(),
false, groupId), msgUUID, timeout, timeUnit);
}
}
@@ -288,13 +364,35 @@ public class DefaultMessageSender implements MessageSender {
}
public SendResult sendMessage(List<byte[]> bodyList, String groupId, String streamId, long dt,
- String msgUUID, long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap) {
+ String msgUUID, long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap) {
+ return sendMessage(bodyList, groupId, streamId, dt, msgUUID, timeout, timeUnit, extraAttrMap, false);
+ }
+
+ /**
+ * sync send a batch of messages
+ *
+ * @param bodyList list of messages
+ * @param groupId groupId
+ * @param streamId streamId
+ * @param dt data report timestamp
+ * @param msgUUID msg uuid
+ * @param timeout
+ * @param timeUnit
+ * @param extraAttrMap extra attributes
+ * @param isProxySend true: dataproxy doesn't return response message until data is sent to MQ
+ * @return SendResult.OK means success
+ */
+ public SendResult sendMessage(List<byte[]> bodyList, String groupId, String streamId, long dt,
+ String msgUUID, long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap, boolean isProxySend) {
dt = ProxyUtils.covertZeroDt(dt);
- if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt)
- || !ProxyUtils.isAttrKeysValid(extraAttrMap)) {
+ if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt) || !ProxyUtils.isAttrKeysValid(
+ extraAttrMap)) {
return SendResult.INVALID_ATTRIBUTES;
}
addIndexCnt(groupId, streamId, bodyList.size());
+ if (isProxySend) {
+ extraAttrMap.put(AttributeConstants.MESSAGE_PROXY_SEND, "true");
+ }
StringBuilder attrs = ProxyUtils.convertAttrToStr(extraAttrMap);
if (msgtype == 7 || msgtype == 8) {
@@ -322,37 +420,62 @@ public class DefaultMessageSender implements MessageSender {
@Deprecated
public void asyncSendMessage(SendMessageCallback callback, byte[] body, String attributes,
- String msgUUID, long timeout, TimeUnit timeUnit) throws ProxysdkException {
+ String msgUUID, long timeout, TimeUnit timeUnit) throws ProxysdkException {
sender.asyncSendMessage(new EncodeObject(body, attributes, idGenerator.getNextId()),
callback, msgUUID, timeout, timeUnit);
}
- public void asyncSendMessage(SendMessageCallback callback, byte[] body,
- String groupId, String streamId, long dt, String msgUUID,
- long timeout, TimeUnit timeUnit) throws ProxysdkException {
+ public void asyncSendMessage(SendMessageCallback callback, byte[] body, String groupId, String streamId, long dt,
+ String msgUUID, long timeout, TimeUnit timeUnit) throws ProxysdkException {
+ asyncSendMessage(callback, body, groupId, streamId, dt, msgUUID, timeout, timeUnit, false);
+ }
+
+ /**
+ * async send single message
+ *
+ * @param callback callback can be null
+ * @param body message data
+ * @param groupId groupId
+ * @param streamId streamId
+ * @param dt data report timestamp
+ * @param msgUUID msg uuid
+ * @param timeout
+ * @param timeUnit
+ * @param isProxySend true: dataproxy doesn't return response message until data is sent to MQ
+ * @throws ProxysdkException
+ */
+ public void asyncSendMessage(SendMessageCallback callback, byte[] body, String groupId, String streamId, long dt,
+ String msgUUID, long timeout, TimeUnit timeUnit, boolean isProxySend) throws ProxysdkException {
dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt)) {
throw new ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
}
addIndexCnt(groupId, streamId, 1);
+ String proxySend = "";
+ if (isProxySend) {
+ proxySend = AttributeConstants.MESSAGE_PROXY_SEND + "=true";
+ }
boolean isCompressEnd = (isCompress && (body.length > cpsSize));
if (msgtype == 7 || msgtype == 8) {
EncodeObject encodeObject = new EncodeObject(body, this.getMsgtype(), isCompressEnd, isReport,
isGroupIdTransfer, dt / 1000, idGenerator.getNextInt(),
- groupId, streamId, "");
+ groupId, streamId, proxySend);
encodeObject.setSupportLF(isSupportLF);
sender.asyncSendMessage(encodeObject, callback, msgUUID, timeout, timeUnit);
} else if (msgtype == 3 || msgtype == 5) {
if (isCompressEnd) {
+ if (isProxySend) {
+ proxySend = "&" + proxySend;
+ }
sender.asyncSendMessage(new EncodeObject(body, "groupId="
- + groupId + "&streamId=" + streamId + "&dt=" + dt + "&cp=snappy",
+ + groupId + "&streamId=" + streamId + "&dt=" + dt + "&cp=snappy" + proxySend,
idGenerator.getNextId(), this.getMsgtype(), true, groupId),
callback, msgUUID, timeout, timeUnit);
} else {
sender.asyncSendMessage(
new EncodeObject(body, "groupId=" + groupId + "&streamId="
- + streamId + "&dt=" + dt, idGenerator.getNextId(),
+ + streamId + "&dt=" + dt + proxySend, idGenerator.getNextId(),
this.getMsgtype(), false, groupId), callback,
msgUUID, timeout, timeUnit);
}
@@ -360,14 +483,38 @@ public class DefaultMessageSender implements MessageSender {
}
- public void asyncSendMessage(SendMessageCallback callback, byte[] body, String groupId, String streamId,
- long dt, String msgUUID, long timeout, TimeUnit timeUnit,
- Map<String, String> extraAttrMap) throws ProxysdkException {
+ public void asyncSendMessage(SendMessageCallback callback, byte[] body, String groupId, String streamId, long dt,
+ String msgUUID, long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap)
+ throws ProxysdkException {
+ asyncSendMessage(callback, body, groupId, streamId, dt, msgUUID, timeout, timeUnit, extraAttrMap, false);
+ }
+
+ /**
+ * async send single message
+ *
+ * @param callback callback can be null
+ * @param body message data
+ * @param groupId groupId
+ * @param streamId streamId
+ * @param dt data report timestamp
+ * @param msgUUID msg uuid
+ * @param timeout
+ * @param timeUnit
+ * @param extraAttrMap extra attributes
+ * @param isProxySend true: dataproxy doesn't return response message until data is sent to MQ
+ * @throws ProxysdkException
+ */
+ public void asyncSendMessage(SendMessageCallback callback, byte[] body, String groupId, String streamId, long dt,
+ String msgUUID, long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap, boolean isProxySend)
+ throws ProxysdkException {
dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt) || !ProxyUtils.isAttrKeysValid(extraAttrMap)) {
throw new ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
}
addIndexCnt(groupId, streamId, 1);
+ if (isProxySend) {
+ extraAttrMap.put(AttributeConstants.MESSAGE_PROXY_SEND, "true");
+ }
StringBuilder attrs = ProxyUtils.convertAttrToStr(extraAttrMap);
boolean isCompressEnd = (isCompress && (body.length > cpsSize));
@@ -392,45 +539,97 @@ public class DefaultMessageSender implements MessageSender {
}
}
+ public void asyncSendMessage(SendMessageCallback callback, List<byte[]> bodyList, String groupId, String streamId,
+ long dt, String msgUUID, long timeout, TimeUnit timeUnit) throws ProxysdkException {
+ asyncSendMessage(callback, bodyList, groupId, streamId, dt, msgUUID, timeout, timeUnit, false);
+ }
+
+ /**
+ * async send a batch of messages
+ *
+ * @param callback callback can be null
+ * @param bodyList list of messages
+ * @param groupId groupId
+ * @param streamId streamId
+ * @param dt data report time
+ * @param msgUUID msg uuid
+ * @param timeout
+ * @param timeUnit
+ * @param isProxySend true: dataproxy doesn't return response message until data is sent to MQ
+ * @throws ProxysdkException
+ */
public void asyncSendMessage(SendMessageCallback callback, List<byte[]> bodyList,
- String groupId, String streamId, long dt, String msgUUID,
- long timeout, TimeUnit timeUnit) throws ProxysdkException {
+ String groupId, String streamId, long dt, String msgUUID,
+ long timeout, TimeUnit timeUnit, boolean isProxySend) throws ProxysdkException {
dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt)) {
throw new ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
}
addIndexCnt(groupId, streamId, bodyList.size());
+ String proxySend = "";
+ if (isProxySend) {
+ proxySend = AttributeConstants.MESSAGE_PROXY_SEND + "=true";
+ }
if (msgtype == 7 || msgtype == 8) {
EncodeObject encodeObject = new EncodeObject(bodyList, this.getMsgtype(), isCompress,
isReport, isGroupIdTransfer, dt / 1000, idGenerator.getNextInt(),
- groupId, streamId, "");
+ groupId, streamId, proxySend);
encodeObject.setSupportLF(isSupportLF);
sender.asyncSendMessage(encodeObject, callback, msgUUID, timeout, timeUnit);
} else if (msgtype == 3 || msgtype == 5) {
+ if (isProxySend) {
+ proxySend = "&" + proxySend;
+ }
if (isCompress) {
sender.asyncSendMessage(
new EncodeObject(bodyList, "groupId=" + groupId + "&streamId=" + streamId
- + "&dt=" + dt + "&cp=snappy" + "&cnt=" + bodyList.size(), idGenerator.getNextId(),
+ + "&dt=" + dt + "&cp=snappy" + "&cnt=" + bodyList.size() + proxySend,
+ idGenerator.getNextId(),
this.getMsgtype(), true, groupId), callback, msgUUID, timeout, timeUnit);
} else {
sender.asyncSendMessage(
- new EncodeObject(bodyList, "groupId=" + groupId + "&streamId=" + streamId + "&dt=" + dt
- + "&cnt=" + bodyList.size(), idGenerator.getNextId(), this.getMsgtype(),
- false, groupId), callback, msgUUID, timeout, timeUnit);
+ new EncodeObject(bodyList,
+ "groupId=" + groupId + "&streamId=" + streamId + "&dt=" + dt + "&cnt=" + bodyList.size()
+ + proxySend, idGenerator.getNextId(), this.getMsgtype(), false, groupId),
+ callback, msgUUID, timeout, timeUnit);
}
}
}
public void asyncSendMessage(SendMessageCallback callback,
- List<byte[]> bodyList, String groupId, String streamId, long dt, String msgUUID,
- long timeout, TimeUnit timeUnit,
- Map<String, String> extraAttrMap) throws ProxysdkException {
+ List<byte[]> bodyList, String groupId, String streamId, long dt, String msgUUID,
+ long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap) throws ProxysdkException {
+ asyncSendMessage(callback, bodyList, groupId, streamId, dt, msgUUID, timeout, timeUnit, extraAttrMap, false);
+ }
+
+ /**
+ * async send a batch of messages
+ *
+ * @param callback callback can be null
+ * @param bodyList list of messages
+ * @param groupId groupId
+ * @param streamId streamId
+ * @param dt data report time
+ * @param msgUUID msg uuid
+ * @param timeout
+ * @param timeUnit
+ * @param extraAttrMap extra attributes
+ * @param isProxySend true: dataproxy doesn't return response message until data is sent to MQ
+ * @throws ProxysdkException
+ */
+ public void asyncSendMessage(SendMessageCallback callback,
+ List<byte[]> bodyList, String groupId, String streamId, long dt, String msgUUID,
+ long timeout, TimeUnit timeUnit,
+ Map<String, String> extraAttrMap, boolean isProxySend) throws ProxysdkException {
dt = ProxyUtils.covertZeroDt(dt);
- if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt)
- || !ProxyUtils.isAttrKeysValid(extraAttrMap)) {
+ if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt) || !ProxyUtils.isAttrKeysValid(
+ extraAttrMap)) {
throw new ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
}
addIndexCnt(groupId, streamId, bodyList.size());
+ if (isProxySend) {
+ extraAttrMap.put(AttributeConstants.MESSAGE_PROXY_SEND, "true");
+ }
StringBuilder attrs = ProxyUtils.convertAttrToStr(extraAttrMap);
if (msgtype == 7 || msgtype == 8) {
@@ -465,28 +664,60 @@ public class DefaultMessageSender implements MessageSender {
* @throws ProxysdkException
*/
@Override
- public void asyncSendMessage(String inlongGroupId, String inlongStreamId, byte[] body,
- SendMessageCallback callback) throws ProxysdkException {
+ public void asyncSendMessage(String inlongGroupId, String inlongStreamId, byte[] body, SendMessageCallback callback)
+ throws ProxysdkException {
this.asyncSendMessage(callback, body, inlongGroupId, inlongStreamId, System.currentTimeMillis(),
idGenerator.getNextId(), DEFAULT_SEND_TIMEOUT, DEFAULT_SEND_TIMEUNIT);
}
/**
- * asyncSendMessage
+ * async send single message
*
- * @param inlongGroupId
- * @param inlongStreamId
- * @param bodyList
- * @param callback
+ * @param inlongGroupId groupId
+ * @param inlongStreamId streamId
+ * @param body a single message
+ * @param callback callback can be null
+ * @param isProxySend true: dataproxy doesn't return response message until data is sent to MQ
+ * @throws ProxysdkException
+ */
+ public void asyncSendMessage(String inlongGroupId, String inlongStreamId, byte[] body, SendMessageCallback callback,
+ boolean isProxySend) throws ProxysdkException {
+ this.asyncSendMessage(callback, body, inlongGroupId, inlongStreamId, System.currentTimeMillis(),
+ idGenerator.getNextId(), DEFAULT_SEND_TIMEOUT, DEFAULT_SEND_TIMEUNIT, isProxySend);
+ }
+
+ /**
+ * async send a batch of messages
+ *
+ * @param inlongGroupId groupId
+ * @param inlongStreamId streamId
+ * @param bodyList list of messages
+ * @param callback callback can be null
* @throws ProxysdkException
*/
@Override
public void asyncSendMessage(String inlongGroupId, String inlongStreamId, List<byte[]> bodyList,
- SendMessageCallback callback) throws ProxysdkException {
+ SendMessageCallback callback) throws ProxysdkException {
this.asyncSendMessage(callback, bodyList, inlongGroupId, inlongStreamId, System.currentTimeMillis(),
idGenerator.getNextId(), DEFAULT_SEND_TIMEOUT, DEFAULT_SEND_TIMEUNIT);
}
+ /**
+ * async send a batch of messages
+ *
+ * @param inlongGroupId groupId
+ * @param inlongStreamId streamId
+ * @param bodyList list of messages
+ * @param callback callback can be null
+ * @param isProxySend true: dataproxy doesn't return response message until data is sent to MQ
+ * @throws ProxysdkException
+ */
+ public void asyncSendMessage(String inlongGroupId, String inlongStreamId, List<byte[]> bodyList,
+ SendMessageCallback callback, boolean isProxySend) throws ProxysdkException {
+ this.asyncSendMessage(callback, bodyList, inlongGroupId, inlongStreamId, System.currentTimeMillis(),
+ idGenerator.getNextId(), DEFAULT_SEND_TIMEOUT, DEFAULT_SEND_TIMEUNIT, isProxySend);
+ }
+
private void addIndexCnt(String groupId, String streamId, long cnt) {
try {
String key = groupId + "|" + streamId;
@@ -501,10 +732,10 @@ public class DefaultMessageSender implements MessageSender {
}
}
- public void asyncsendMessageData(FileCallback callback, List<byte[]> bodyList, String groupId,
- String streamId, long dt, int sid, boolean isSupportLF, String msgUUID,
- long timeout, TimeUnit timeUnit,
- Map<String, String> extraAttrMap) throws ProxysdkException {
+ @Deprecated
+ public void asyncsendMessageData(FileCallback callback, List<byte[]> bodyList, String groupId, String streamId,
+ long dt, int sid, boolean isSupportLF, String msgUUID, long timeout, TimeUnit timeUnit,
+ Map<String, String> extraAttrMap) throws ProxysdkException {
dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt)
|| !ProxyUtils.isAttrKeysValid(extraAttrMap)) {
@@ -523,9 +754,9 @@ public class DefaultMessageSender implements MessageSender {
}
}
- private void asyncSendMetric(FileCallback callback, byte[] body, String groupId,
- String streamId, long dt, int sid, String ip, String msgUUID,
- long timeout, TimeUnit timeUnit, String messageKey) throws ProxysdkException {
+ @Deprecated
+ private void asyncSendMetric(FileCallback callback, byte[] body, String groupId, String streamId, long dt, int sid,
+ String ip, String msgUUID, long timeout, TimeUnit timeUnit, String messageKey) throws ProxysdkException {
dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt)) {
throw new ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
@@ -538,23 +769,23 @@ public class DefaultMessageSender implements MessageSender {
}
}
- public void asyncsendMessageProxy(FileCallback callback, byte[] body, String groupId, String streamId,
- long dt, int sid, String ip, String msgUUID,
- long timeout, TimeUnit timeUnit) throws ProxysdkException {
+ @Deprecated
+ public void asyncsendMessageProxy(FileCallback callback, byte[] body, String groupId, String streamId, long dt,
+ int sid, String ip, String msgUUID, long timeout, TimeUnit timeUnit) throws ProxysdkException {
asyncSendMetric(callback, body, groupId, streamId, dt, sid, ip, msgUUID, timeout,
timeUnit, "minute");
}
- public void asyncsendMessageFile(FileCallback callback, byte[] body, String groupId,
- String streamId, long dt, int sid, String msgUUID,
- long timeout, TimeUnit timeUnit) throws ProxysdkException {
+ @Deprecated
+ public void asyncsendMessageFile(FileCallback callback, byte[] body, String groupId, String streamId, long dt,
+ int sid, String msgUUID, long timeout, TimeUnit timeUnit) throws ProxysdkException {
asyncSendMetric(callback, body, groupId, streamId, dt, sid, "", msgUUID, timeout, timeUnit,
"file");
}
- public String sendMessageData(List<byte[]> bodyList, String groupId,
- String streamId, long dt, int sid, boolean isSupportLF, String msgUUID,
- long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap) {
+ @Deprecated
+ public String sendMessageData(List<byte[]> bodyList, String groupId, String streamId, long dt, int sid,
+ boolean isSupportLF, String msgUUID, long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap) {
dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt)
|| !ProxyUtils.isAttrKeysValid(extraAttrMap)) {
@@ -574,9 +805,9 @@ public class DefaultMessageSender implements MessageSender {
return null;
}
- private String sendMetric(byte[] body, String groupId, String streamId, long dt, int sid, String ip,
- String msgUUID,
- long timeout, TimeUnit timeUnit, String messageKey) {
+ @Deprecated
+ private String sendMetric(byte[] body, String groupId, String streamId, long dt, int sid, String ip, String msgUUID,
+ long timeout, TimeUnit timeUnit, String messageKey) {
dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt)) {
return SendResult.INVALID_ATTRIBUTES.toString();
@@ -589,14 +820,15 @@ public class DefaultMessageSender implements MessageSender {
return null;
}
- public String sendMessageProxy(byte[] body, String groupId, String streamId,
- long dt, int sid, String ip, String msgUUID,
- long timeout, TimeUnit timeUnit) {
+ @Deprecated
+ public String sendMessageProxy(byte[] body, String groupId, String streamId, long dt, int sid, String ip,
+ String msgUUID, long timeout, TimeUnit timeUnit) {
return sendMetric(body, groupId, streamId, dt, sid, ip, msgUUID, timeout, timeUnit, "minute");
}
+ @Deprecated
public String sendMessageFile(byte[] body, String groupId, String streamId, long dt, int sid, String msgUUID,
- long timeout, TimeUnit timeUnit) {
+ long timeout, TimeUnit timeUnit) {
return sendMetric(body, groupId, streamId, dt, sid, "", msgUUID, timeout, timeUnit, "file");
}
diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
index df1c4939a..fd6b0f275 100644
--- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
+++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
@@ -39,7 +39,8 @@ public class ProxyUtils {
static {
Collections.addAll(invalidAttr, "groupId", "streamId", "dt", "msgUUID", "cp",
"cnt", "mt", "m", "sid", "t", "NodeIP", "messageId", "_file_status_check", "_secretId",
- "_signature", "_timeStamp", "_nonce", "_userName", "_clientIP", "_encyVersion", "_encyAesKey");
+ "_signature", "_timeStamp", "_nonce", "_userName", "_clientIP", "_encyVersion", "_encyAesKey",
+ "proxySend", "errMsg", "errCode");
}
public static boolean isAttrKeysValid(Map<String, String> attrsMap) {