You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2022/11/21 11:51:24 UTC

[inlong] branch master updated: [INLONG-6592][DataProxy] Fix the problem of missing OrderEvent (#6596)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new a22c2e034 [INLONG-6592][DataProxy] Fix the problem of missing OrderEvent (#6596)
a22c2e034 is described below

commit a22c2e034aa15dee3aea6212581e71529a7031ca
Author: 卢春亮 <94...@qq.com>
AuthorDate: Mon Nov 21 19:51:18 2022 +0800

    [INLONG-6592][DataProxy] Fix the problem of missing OrderEvent (#6596)
---
 .../dataproxy/sink/mq/OrderBatchPackProfileV0.java | 50 +++++++++++++++++++---
 .../sink/mq/SimpleBatchPackProfileV0.java          |  2 +-
 2 files changed, 44 insertions(+), 8 deletions(-)

diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/OrderBatchPackProfileV0.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/OrderBatchPackProfileV0.java
index 8322bd786..fdb384cd6 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/OrderBatchPackProfileV0.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/OrderBatchPackProfileV0.java
@@ -18,17 +18,18 @@
 package org.apache.inlong.dataproxy.sink.mq;
 
 import org.apache.commons.lang3.math.NumberUtils;
-import org.apache.inlong.dataproxy.base.OrderEvent;
-import org.apache.inlong.dataproxy.consts.AttributeConstants;
+import org.apache.inlong.common.msg.AttributeConstants;
+import org.apache.inlong.dataproxy.base.SinkRspEvent;
 import org.apache.inlong.dataproxy.source.MsgType;
-import org.apache.inlong.dataproxy.utils.MessageUtils;
 import org.apache.inlong.sdk.commons.protocol.InlongId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.nio.charset.StandardCharsets;
 import java.util.Map;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
 
 /**
  * SimpleBatchPackProfileV0
@@ -38,7 +39,7 @@ public class OrderBatchPackProfileV0 extends BatchPackProfile {
 
     public static final Logger LOG = LoggerFactory.getLogger(OrderBatchPackProfileV0.class);
 
-    private OrderEvent orderProfile;
+    private SinkRspEvent orderProfile;
 
     /**
      * Constructor
@@ -56,7 +57,7 @@ public class OrderBatchPackProfileV0 extends BatchPackProfile {
      * @param event
      * @return
      */
-    public static OrderBatchPackProfileV0 create(OrderEvent event) {
+    public static OrderBatchPackProfileV0 create(SinkRspEvent event) {
         Map<String, String> headers = event.getHeaders();
         String inlongGroupId = headers.get(AttributeConstants.GROUP_ID);;
         String inlongStreamId = headers.get(AttributeConstants.STREAM_ID);
@@ -75,7 +76,7 @@ public class OrderBatchPackProfileV0 extends BatchPackProfile {
      * get event
      * @return the event
      */
-    public OrderEvent getOrderProfile() {
+    public SinkRspEvent getOrderProfile() {
         return orderProfile;
     }
 
@@ -97,9 +98,44 @@ public class OrderBatchPackProfileV0 extends BatchPackProfile {
                     LOG.debug("order message rsp: seqId = {}, inlongGroupId = {}, inlongStreamId = {}", sequenceId,
                             this.getInlongGroupId(), this.getInlongStreamId());
                 }
-                ByteBuf binBuffer = MessageUtils.getResponsePackage("", MsgType.MSG_BIN_MULTI_BODY, sequenceId);
+                ByteBuf binBuffer = getResponsePackage("", MsgType.MSG_BIN_MULTI_BODY, sequenceId);
                 orderProfile.getCtx().writeAndFlush(binBuffer);
             });
         }
     }
+
+    /**
+     * Convert String to ByteBuf
+     *
+     * @param backattrs
+     * @param msgType message type
+     * @param sequenceId sequence Id
+     * @return ByteBuf
+     */
+    public static ByteBuf getResponsePackage(String backattrs, MsgType msgType, String sequenceId) {
+        int binTotalLen = 1 + 4 + 2 + 2;
+        if (null != backattrs) {
+            binTotalLen += backattrs.length();
+        }
+        ByteBuf binBuffer = ByteBufAllocator.DEFAULT.buffer(4 + binTotalLen);
+        binBuffer.writeInt(binTotalLen);
+        binBuffer.writeByte(msgType.getValue());
+
+        long uniqVal = Long.parseLong(sequenceId);
+        byte[] uniq = new byte[4];
+        uniq[0] = (byte) ((uniqVal >> 24) & 0xFF);
+        uniq[1] = (byte) ((uniqVal >> 16) & 0xFF);
+        uniq[2] = (byte) ((uniqVal >> 8) & 0xFF);
+        uniq[3] = (byte) (uniqVal & 0xFF);
+        binBuffer.writeBytes(uniq);
+
+        if (null != backattrs) {
+            binBuffer.writeShort(backattrs.length());
+            binBuffer.writeBytes(backattrs.getBytes(StandardCharsets.UTF_8));
+        } else {
+            binBuffer.writeShort(0x0);
+        }
+        binBuffer.writeShort(0xee01);
+        return binBuffer;
+    }
 }
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/SimpleBatchPackProfileV0.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/SimpleBatchPackProfileV0.java
index 8b2c82d3b..f27eef62d 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/SimpleBatchPackProfileV0.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/SimpleBatchPackProfileV0.java
@@ -19,7 +19,7 @@ package org.apache.inlong.dataproxy.sink.mq;
 
 import org.apache.commons.lang3.math.NumberUtils;
 import org.apache.flume.Event;
-import org.apache.inlong.dataproxy.consts.AttributeConstants;
+import org.apache.inlong.common.msg.AttributeConstants;
 import org.apache.inlong.sdk.commons.protocol.InlongId;
 
 import java.util.Map;