You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by GitBox <gi...@apache.org> on 2022/10/31 02:31:22 UTC

[GitHub] [inlong] EMsnap commented on a diff in pull request #6325: [INLONG-6324][DataProxy] Optimize DataProxy's message processing on the source side

EMsnap commented on code in PR #6325:
URL: https://github.com/apache/inlong/pull/6325#discussion_r1008984868


##########
inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/MessageUtils.java:
##########
@@ -56,33 +63,193 @@ public static boolean isSyncSendForOrder(Event event) {
     }
 
     /**
-     * Convert String to ByteBuf
+     *  process response to client
+     * @param commonAttrMap attribute map
+     * @param resultMap     result map
+     * @param remoteChannel client channel
+     * @param msgType       the message type
+     */
+    public static void returnSourceRspPackage(Map<String, String> commonAttrMap,
+                                              Map<String, Object> resultMap,
+                                              Channel remoteChannel,
+                                              MsgType msgType) throws Exception {
+        ByteBuf binBuffer;
+        String origAttrs = null;
+        final StringBuilder strBuff = new StringBuilder(512);
+        // build message bytes
+        if (MsgType.MSG_UNKNOWN.equals(msgType)) {
+            if (logCounter.shouldPrint()) {
+                logger.warn("Unknown msgType message from {}, discard it!", remoteChannel);
+            }
+            return;
+        }
+        if (MsgType.MSG_HEARTBEAT.equals(msgType)) {
+            binBuffer = buildHeartBeatMsgRspPackage();
+        } else {
+            // check whether return response message
+            String isAck = commonAttrMap.get(AttributeConstants.MESSAGE_IS_ACK);
+            if ("false".equalsIgnoreCase(isAck)) {
+                return;
+            }
+            origAttrs = (String) resultMap.get(ConfigConstants.DECODER_ATTRS);
+            // check whether channel is writable
+            if (!remoteChannel.isWritable()) {
+                strBuff.append("Send buffer is full1, so disconnect ")
+                        .append(remoteChannel).append(", attr is ").append(origAttrs);
+                if (logCounter.shouldPrint()) {
+                    logger.warn(strBuff.toString());
+                }
+                throw new Exception(strBuff.toString());
+            }
+            // build return attribute string
+            String errCode = commonAttrMap.get(AttributeConstants.MESSAGE_PROCESS_ERRCODE);
+            String errMsg = commonAttrMap.get(AttributeConstants.MESSAGE_PROCESS_ERRMSG);
+            strBuff.append(ConfigConstants.DATAPROXY_IP_KEY)
+                    .append(AttributeConstants.KEY_VALUE_SEPARATOR).append(getLocalIp());
+            if (StringUtils.isNotEmpty(errCode)) {
+                strBuff.append(AttributeConstants.SEPARATOR)
+                        .append(AttributeConstants.MESSAGE_PROCESS_ERRCODE)
+                        .append(AttributeConstants.KEY_VALUE_SEPARATOR).append(errCode);
+            }
+            if (StringUtils.isNotEmpty(errMsg)) {
+                strBuff.append(AttributeConstants.SEPARATOR)
+                        .append(AttributeConstants.MESSAGE_PROCESS_ERRMSG)
+                        .append(AttributeConstants.KEY_VALUE_SEPARATOR).append(errMsg);
+            }
+            if (StringUtils.isNotEmpty(origAttrs)) {
+                strBuff.append(AttributeConstants.SEPARATOR).append(origAttrs);
+            }
+            String destAttrs = strBuff.toString();
+            // build response message bytes
+            if (MsgType.MSG_BIN_MULTI_BODY.equals(msgType)) {
+                binBuffer = buildBinMsgRspPackage(destAttrs,
+                        commonAttrMap.get(AttributeConstants.UNIQ_ID));
+            } else if (MsgType.MSG_BIN_HEARTBEAT.equals(msgType)) {
+                binBuffer = buildHBRspPackage(destAttrs,
+                        (Byte)resultMap.get(ConfigConstants.VERSION_TYPE), 0);
+            } else {
+                // MsgType.MSG_ACK_SERVICE.equals(msgType)
+                // MsgType.MSG_ORIGINAL_RETURN.equals(msgType)
+                // MsgType.MSG_MULTI_BODY.equals(msgType)
+                // MsgType.MSG_MULTI_BODY_ATTR.equals(msgType)
+                binBuffer = buildDefMsgRspPackage(msgType, destAttrs);
+            }
+        }
+        // send response message
+        if (remoteChannel.isWritable()) {
+            remoteChannel.writeAndFlush(binBuffer);
+        } else {
+            // release allocated ByteBuf
+            binBuffer.release();
+            strBuff.delete(0, strBuff.length());
+            strBuff.append("Send buffer is full2, so disconnect ")
+                    .append(remoteChannel).append(", attr is ").append(origAttrs);
+            if (logCounter.shouldPrint()) {
+                logger.warn(strBuff.toString());
+            }
+            throw new Exception(strBuff.toString());
+        }
+    }
+
+    /**
+     * Build hearbeat(1)-msg response message ByteBuf
+     *
+     * @return ByteBuf
+     */
+    private static ByteBuf buildHeartBeatMsgRspPackage() {
+        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(5);
+        buffer.writeBytes(new byte[]{0, 0, 0, 1, 1});
+        return buffer;
+    }
+
+    /**
+     * Build default-msg response message ByteBuf
      *
-     * @param backattrs
-     * @param msgType message type
+     * @param msgType  the message type
+     * @param attrs    the return attribute
+     * @return ByteBuf
+     */
+    private static ByteBuf buildDefMsgRspPackage(MsgType msgType, String attrs) {
+        int attrsLen = 0;
+        int bodyLen = 0;
+        if (attrs != null) {
+            attrsLen = attrs.length();
+        }
+        int backTotalLen = 1 + 4 + bodyLen + 4 + attrsLen;
+        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(4 + backTotalLen);
+        buffer.writeInt(backTotalLen);
+        buffer.writeByte(msgType.getValue());
+        buffer.writeInt(bodyLen);
+        buffer.writeInt(attrsLen);
+        if (attrsLen > 0) {
+            buffer.writeBytes(attrs.getBytes(StandardCharsets.UTF_8));
+        }
+        return buffer;
+    }
+
+    /**
+     * Build bin-msg response message ByteBuf
+     *
+     * @param attrs   the return attribute
      * @param sequenceId sequence Id
      * @return ByteBuf
      */
-    public static ByteBuf getResponsePackage(String backattrs, MsgType msgType, String sequenceId) {
+    public static ByteBuf buildBinMsgRspPackage(String attrs, String sequenceId) {
+        // calculate total length
         int binTotalLen = 1 + 4 + 2 + 2;
-        if (null != backattrs) {
-            binTotalLen += backattrs.length();
+        if (null != attrs) {
+            binTotalLen += attrs.length();
         }
+        // allocate buffer and write fields
         ByteBuf binBuffer = ByteBufAllocator.DEFAULT.buffer(4 + binTotalLen);
         binBuffer.writeInt(binTotalLen);
-        binBuffer.writeByte(msgType.getValue());
-
+        binBuffer.writeByte(MsgType.MSG_BIN_MULTI_BODY.getValue());
         long uniqVal = Long.parseLong(sequenceId);
         byte[] uniq = new byte[4];
         uniq[0] = (byte) ((uniqVal >> 24) & 0xFF);
         uniq[1] = (byte) ((uniqVal >> 16) & 0xFF);
         uniq[2] = (byte) ((uniqVal >> 8) & 0xFF);
         uniq[3] = (byte) (uniqVal & 0xFF);
         binBuffer.writeBytes(uniq);
+        if (null != attrs) {
+            binBuffer.writeShort(attrs.length());
+            binBuffer.writeBytes(attrs.getBytes(StandardCharsets.UTF_8));
+        } else {
+            binBuffer.writeShort(0x0);
+        }
+        binBuffer.writeShort(0xee01);
+        return binBuffer;
+    }
 
-        if (null != backattrs) {
-            binBuffer.writeShort(backattrs.length());
-            binBuffer.writeBytes(backattrs.getBytes(StandardCharsets.UTF_8));
+    /**
+     * Build heartbeat response message ByteBuf
+     *
+     * @param attrs     the attribute string
+     * @param version   the version
+     * @param loadValue the node load value
+     * @return ByteBuf
+     */
+    private static ByteBuf buildHBRspPackage(String attrs, byte version, int loadValue) {
+        // calculate total length
+        int binTotalLen = 1 + 4 + 1 + 4 + 2 + 2 + 2;

Review Comment:
   guess it's nessasary to explain the numbers for different part here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org