You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2022/11/21 11:51:24 UTC
[inlong] branch master updated: [INLONG-6592][DataProxy] Fix the problem of missing OrderEvent (#6596)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new a22c2e034 [INLONG-6592][DataProxy] Fix the problem of missing OrderEvent (#6596)
a22c2e034 is described below
commit a22c2e034aa15dee3aea6212581e71529a7031ca
Author: 卢春亮 <94...@qq.com>
AuthorDate: Mon Nov 21 19:51:18 2022 +0800
[INLONG-6592][DataProxy] Fix the problem of missing OrderEvent (#6596)
---
.../dataproxy/sink/mq/OrderBatchPackProfileV0.java | 50 +++++++++++++++++++---
.../sink/mq/SimpleBatchPackProfileV0.java | 2 +-
2 files changed, 44 insertions(+), 8 deletions(-)
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/OrderBatchPackProfileV0.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/OrderBatchPackProfileV0.java
index 8322bd786..fdb384cd6 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/OrderBatchPackProfileV0.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/OrderBatchPackProfileV0.java
@@ -18,17 +18,18 @@
package org.apache.inlong.dataproxy.sink.mq;
import org.apache.commons.lang3.math.NumberUtils;
-import org.apache.inlong.dataproxy.base.OrderEvent;
-import org.apache.inlong.dataproxy.consts.AttributeConstants;
+import org.apache.inlong.common.msg.AttributeConstants;
+import org.apache.inlong.dataproxy.base.SinkRspEvent;
import org.apache.inlong.dataproxy.source.MsgType;
-import org.apache.inlong.dataproxy.utils.MessageUtils;
import org.apache.inlong.sdk.commons.protocol.InlongId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.nio.charset.StandardCharsets;
import java.util.Map;
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
/**
* SimpleBatchPackProfileV0
@@ -38,7 +39,7 @@ public class OrderBatchPackProfileV0 extends BatchPackProfile {
public static final Logger LOG = LoggerFactory.getLogger(OrderBatchPackProfileV0.class);
- private OrderEvent orderProfile;
+ private SinkRspEvent orderProfile;
/**
* Constructor
@@ -56,7 +57,7 @@ public class OrderBatchPackProfileV0 extends BatchPackProfile {
* @param event
* @return
*/
- public static OrderBatchPackProfileV0 create(OrderEvent event) {
+ public static OrderBatchPackProfileV0 create(SinkRspEvent event) {
Map<String, String> headers = event.getHeaders();
String inlongGroupId = headers.get(AttributeConstants.GROUP_ID);;
String inlongStreamId = headers.get(AttributeConstants.STREAM_ID);
@@ -75,7 +76,7 @@ public class OrderBatchPackProfileV0 extends BatchPackProfile {
* get event
* @return the event
*/
- public OrderEvent getOrderProfile() {
+ public SinkRspEvent getOrderProfile() {
return orderProfile;
}
@@ -97,9 +98,44 @@ public class OrderBatchPackProfileV0 extends BatchPackProfile {
LOG.debug("order message rsp: seqId = {}, inlongGroupId = {}, inlongStreamId = {}", sequenceId,
this.getInlongGroupId(), this.getInlongStreamId());
}
- ByteBuf binBuffer = MessageUtils.getResponsePackage("", MsgType.MSG_BIN_MULTI_BODY, sequenceId);
+ ByteBuf binBuffer = getResponsePackage("", MsgType.MSG_BIN_MULTI_BODY, sequenceId);
orderProfile.getCtx().writeAndFlush(binBuffer);
});
}
}
+
+ /**
+ * Convert String to ByteBuf
+ *
+ * @param backattrs
+ * @param msgType message type
+ * @param sequenceId sequence Id
+ * @return ByteBuf
+ */
+ public static ByteBuf getResponsePackage(String backattrs, MsgType msgType, String sequenceId) {
+ int binTotalLen = 1 + 4 + 2 + 2;
+ if (null != backattrs) {
+ binTotalLen += backattrs.length();
+ }
+ ByteBuf binBuffer = ByteBufAllocator.DEFAULT.buffer(4 + binTotalLen);
+ binBuffer.writeInt(binTotalLen);
+ binBuffer.writeByte(msgType.getValue());
+
+ long uniqVal = Long.parseLong(sequenceId);
+ byte[] uniq = new byte[4];
+ uniq[0] = (byte) ((uniqVal >> 24) & 0xFF);
+ uniq[1] = (byte) ((uniqVal >> 16) & 0xFF);
+ uniq[2] = (byte) ((uniqVal >> 8) & 0xFF);
+ uniq[3] = (byte) (uniqVal & 0xFF);
+ binBuffer.writeBytes(uniq);
+
+ if (null != backattrs) {
+ binBuffer.writeShort(backattrs.length());
+ binBuffer.writeBytes(backattrs.getBytes(StandardCharsets.UTF_8));
+ } else {
+ binBuffer.writeShort(0x0);
+ }
+ binBuffer.writeShort(0xee01);
+ return binBuffer;
+ }
}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/SimpleBatchPackProfileV0.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/SimpleBatchPackProfileV0.java
index 8b2c82d3b..f27eef62d 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/SimpleBatchPackProfileV0.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/SimpleBatchPackProfileV0.java
@@ -19,7 +19,7 @@ package org.apache.inlong.dataproxy.sink.mq;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.flume.Event;
-import org.apache.inlong.dataproxy.consts.AttributeConstants;
+import org.apache.inlong.common.msg.AttributeConstants;
import org.apache.inlong.sdk.commons.protocol.InlongId;
import java.util.Map;