You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by GitBox <gi...@apache.org> on 2022/10/29 14:56:49 UTC

[GitHub] [inlong] gosonzhang opened a new pull request, #6325: [INLONG-6324][DataProxy] Optimize DataProxy's message processing on the source side

gosonzhang opened a new pull request, #6325:
URL: https://github.com/apache/inlong/pull/6325

   
   
   - Fixes #6324
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] gosonzhang commented on a diff in pull request #6325: [INLONG-6324][DataProxy] Optimize DataProxy's message processing on the source side

Posted by GitBox <gi...@apache.org>.
gosonzhang commented on code in PR #6325:
URL: https://github.com/apache/inlong/pull/6325#discussion_r1008999757


##########
inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/MessageUtils.java:
##########
@@ -56,33 +63,197 @@ public static boolean isSyncSendForOrder(Event event) {
     }
 
     /**
-     * Convert String to ByteBuf
+     *  process response to client
+     * @param commonAttrMap attribute map
+     * @param resultMap     result map
+     * @param remoteChannel client channel
+     * @param msgType       the message type
+     */
+    public static void returnSourceRspPackage(Map<String, String> commonAttrMap,

Review Comment:
   Ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] gosonzhang merged pull request #6325: [INLONG-6324][DataProxy] Optimize DataProxy's message processing on the source side

Posted by GitBox <gi...@apache.org>.
gosonzhang merged PR #6325:
URL: https://github.com/apache/inlong/pull/6325


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] gosonzhang commented on a diff in pull request #6325: [INLONG-6324][DataProxy] Optimize DataProxy's message processing on the source side

Posted by GitBox <gi...@apache.org>.
gosonzhang commented on code in PR #6325:
URL: https://github.com/apache/inlong/pull/6325#discussion_r1008820916


##########
inlong-common/src/main/java/org/apache/inlong/common/enums/DataProxyErrCode.java:
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.common.enums;
+
+public enum DataProxyErrCode {
+
+    ERR_CODE_SUCCESS(0, "Ok"),

Review Comment:
   This writing is just the name of the enumeration, and it is only a numerical value when it is interacted, and only a habit problem



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] EMsnap commented on a diff in pull request #6325: [INLONG-6324][DataProxy] Optimize DataProxy's message processing on the source side

Posted by GitBox <gi...@apache.org>.
EMsnap commented on code in PR #6325:
URL: https://github.com/apache/inlong/pull/6325#discussion_r1008984868


##########
inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/MessageUtils.java:
##########
@@ -56,33 +63,193 @@ public static boolean isSyncSendForOrder(Event event) {
     }
 
     /**
-     * Convert String to ByteBuf
+     *  process response to client
+     * @param commonAttrMap attribute map
+     * @param resultMap     result map
+     * @param remoteChannel client channel
+     * @param msgType       the message type
+     */
+    public static void returnSourceRspPackage(Map<String, String> commonAttrMap,
+                                              Map<String, Object> resultMap,
+                                              Channel remoteChannel,
+                                              MsgType msgType) throws Exception {
+        ByteBuf binBuffer;
+        String origAttrs = null;
+        final StringBuilder strBuff = new StringBuilder(512);
+        // build message bytes
+        if (MsgType.MSG_UNKNOWN.equals(msgType)) {
+            if (logCounter.shouldPrint()) {
+                logger.warn("Unknown msgType message from {}, discard it!", remoteChannel);
+            }
+            return;
+        }
+        if (MsgType.MSG_HEARTBEAT.equals(msgType)) {
+            binBuffer = buildHeartBeatMsgRspPackage();
+        } else {
+            // check whether return response message
+            String isAck = commonAttrMap.get(AttributeConstants.MESSAGE_IS_ACK);
+            if ("false".equalsIgnoreCase(isAck)) {
+                return;
+            }
+            origAttrs = (String) resultMap.get(ConfigConstants.DECODER_ATTRS);
+            // check whether channel is writable
+            if (!remoteChannel.isWritable()) {
+                strBuff.append("Send buffer is full1, so disconnect ")
+                        .append(remoteChannel).append(", attr is ").append(origAttrs);
+                if (logCounter.shouldPrint()) {
+                    logger.warn(strBuff.toString());
+                }
+                throw new Exception(strBuff.toString());
+            }
+            // build return attribute string
+            String errCode = commonAttrMap.get(AttributeConstants.MESSAGE_PROCESS_ERRCODE);
+            String errMsg = commonAttrMap.get(AttributeConstants.MESSAGE_PROCESS_ERRMSG);
+            strBuff.append(ConfigConstants.DATAPROXY_IP_KEY)
+                    .append(AttributeConstants.KEY_VALUE_SEPARATOR).append(getLocalIp());
+            if (StringUtils.isNotEmpty(errCode)) {
+                strBuff.append(AttributeConstants.SEPARATOR)
+                        .append(AttributeConstants.MESSAGE_PROCESS_ERRCODE)
+                        .append(AttributeConstants.KEY_VALUE_SEPARATOR).append(errCode);
+            }
+            if (StringUtils.isNotEmpty(errMsg)) {
+                strBuff.append(AttributeConstants.SEPARATOR)
+                        .append(AttributeConstants.MESSAGE_PROCESS_ERRMSG)
+                        .append(AttributeConstants.KEY_VALUE_SEPARATOR).append(errMsg);
+            }
+            if (StringUtils.isNotEmpty(origAttrs)) {
+                strBuff.append(AttributeConstants.SEPARATOR).append(origAttrs);
+            }
+            String destAttrs = strBuff.toString();
+            // build response message bytes
+            if (MsgType.MSG_BIN_MULTI_BODY.equals(msgType)) {
+                binBuffer = buildBinMsgRspPackage(destAttrs,
+                        commonAttrMap.get(AttributeConstants.UNIQ_ID));
+            } else if (MsgType.MSG_BIN_HEARTBEAT.equals(msgType)) {
+                binBuffer = buildHBRspPackage(destAttrs,
+                        (Byte)resultMap.get(ConfigConstants.VERSION_TYPE), 0);
+            } else {
+                // MsgType.MSG_ACK_SERVICE.equals(msgType)
+                // MsgType.MSG_ORIGINAL_RETURN.equals(msgType)
+                // MsgType.MSG_MULTI_BODY.equals(msgType)
+                // MsgType.MSG_MULTI_BODY_ATTR.equals(msgType)
+                binBuffer = buildDefMsgRspPackage(msgType, destAttrs);
+            }
+        }
+        // send response message
+        if (remoteChannel.isWritable()) {
+            remoteChannel.writeAndFlush(binBuffer);
+        } else {
+            // release allocated ByteBuf
+            binBuffer.release();
+            strBuff.delete(0, strBuff.length());
+            strBuff.append("Send buffer is full2, so disconnect ")
+                    .append(remoteChannel).append(", attr is ").append(origAttrs);
+            if (logCounter.shouldPrint()) {
+                logger.warn(strBuff.toString());
+            }
+            throw new Exception(strBuff.toString());
+        }
+    }
+
+    /**
+     * Build hearbeat(1)-msg response message ByteBuf
+     *
+     * @return ByteBuf
+     */
+    private static ByteBuf buildHeartBeatMsgRspPackage() {
+        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(5);
+        buffer.writeBytes(new byte[]{0, 0, 0, 1, 1});
+        return buffer;
+    }
+
+    /**
+     * Build default-msg response message ByteBuf
      *
-     * @param backattrs
-     * @param msgType message type
+     * @param msgType  the message type
+     * @param attrs    the return attribute
+     * @return ByteBuf
+     */
+    private static ByteBuf buildDefMsgRspPackage(MsgType msgType, String attrs) {
+        int attrsLen = 0;
+        int bodyLen = 0;
+        if (attrs != null) {
+            attrsLen = attrs.length();
+        }
+        int backTotalLen = 1 + 4 + bodyLen + 4 + attrsLen;
+        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(4 + backTotalLen);
+        buffer.writeInt(backTotalLen);
+        buffer.writeByte(msgType.getValue());
+        buffer.writeInt(bodyLen);
+        buffer.writeInt(attrsLen);
+        if (attrsLen > 0) {
+            buffer.writeBytes(attrs.getBytes(StandardCharsets.UTF_8));
+        }
+        return buffer;
+    }
+
+    /**
+     * Build bin-msg response message ByteBuf
+     *
+     * @param attrs   the return attribute
      * @param sequenceId sequence Id
      * @return ByteBuf
      */
-    public static ByteBuf getResponsePackage(String backattrs, MsgType msgType, String sequenceId) {
+    public static ByteBuf buildBinMsgRspPackage(String attrs, String sequenceId) {
+        // calculate total length
         int binTotalLen = 1 + 4 + 2 + 2;
-        if (null != backattrs) {
-            binTotalLen += backattrs.length();
+        if (null != attrs) {
+            binTotalLen += attrs.length();
         }
+        // allocate buffer and write fields
         ByteBuf binBuffer = ByteBufAllocator.DEFAULT.buffer(4 + binTotalLen);
         binBuffer.writeInt(binTotalLen);
-        binBuffer.writeByte(msgType.getValue());
-
+        binBuffer.writeByte(MsgType.MSG_BIN_MULTI_BODY.getValue());
         long uniqVal = Long.parseLong(sequenceId);
         byte[] uniq = new byte[4];
         uniq[0] = (byte) ((uniqVal >> 24) & 0xFF);
         uniq[1] = (byte) ((uniqVal >> 16) & 0xFF);
         uniq[2] = (byte) ((uniqVal >> 8) & 0xFF);
         uniq[3] = (byte) (uniqVal & 0xFF);
         binBuffer.writeBytes(uniq);
+        if (null != attrs) {
+            binBuffer.writeShort(attrs.length());
+            binBuffer.writeBytes(attrs.getBytes(StandardCharsets.UTF_8));
+        } else {
+            binBuffer.writeShort(0x0);
+        }
+        binBuffer.writeShort(0xee01);
+        return binBuffer;
+    }
 
-        if (null != backattrs) {
-            binBuffer.writeShort(backattrs.length());
-            binBuffer.writeBytes(backattrs.getBytes(StandardCharsets.UTF_8));
+    /**
+     * Build heartbeat response message ByteBuf
+     *
+     * @param attrs     the attribute string
+     * @param version   the version
+     * @param loadValue the node load value
+     * @return ByteBuf
+     */
+    private static ByteBuf buildHBRspPackage(String attrs, byte version, int loadValue) {
+        // calculate total length
+        int binTotalLen = 1 + 4 + 1 + 4 + 2 + 2 + 2;

Review Comment:
   guess it's nessasary to explain the numbers for different part here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] gosonzhang commented on a diff in pull request #6325: [INLONG-6324][DataProxy] Optimize DataProxy's message processing on the source side

Posted by GitBox <gi...@apache.org>.
gosonzhang commented on code in PR #6325:
URL: https://github.com/apache/inlong/pull/6325#discussion_r1009007662


##########
inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java:
##########
@@ -243,7 +242,7 @@ private void sendResponse(OrderEvent orderEvent, String inlongGroupId, String in
                     logger.debug("order message rsp: seqId = {}, inlongGroupId = {}, inlongStreamId = {}", sequenceId,
                             inlongGroupId, inlongStreamId);
                 }
-                ByteBuf binBuffer = MessageUtils.getResponsePackage("", MsgType.MSG_BIN_MULTI_BODY, sequenceId);
+                ByteBuf binBuffer = MessageUtils.buildBinMsgRspPackage("", sequenceId);

Review Comment:
   DONE



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] pocozh commented on a diff in pull request #6325: [INLONG-6324][DataProxy] Optimize DataProxy's message processing on the source side

Posted by GitBox <gi...@apache.org>.
pocozh commented on code in PR #6325:
URL: https://github.com/apache/inlong/pull/6325#discussion_r1008994823


##########
inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/MessageUtils.java:
##########
@@ -56,33 +63,197 @@ public static boolean isSyncSendForOrder(Event event) {
     }
 
     /**
-     * Convert String to ByteBuf
+     *  process response to client
+     * @param commonAttrMap attribute map
+     * @param resultMap     result map
+     * @param remoteChannel client channel
+     * @param msgType       the message type
+     */
+    public static void returnSourceRspPackage(Map<String, String> commonAttrMap,

Review Comment:
   suggest to remove "return" in the method name



##########
inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java:
##########
@@ -243,7 +242,7 @@ private void sendResponse(OrderEvent orderEvent, String inlongGroupId, String in
                     logger.debug("order message rsp: seqId = {}, inlongGroupId = {}, inlongStreamId = {}", sequenceId,
                             inlongGroupId, inlongStreamId);
                 }
-                ByteBuf binBuffer = MessageUtils.getResponsePackage("", MsgType.MSG_BIN_MULTI_BODY, sequenceId);
+                ByteBuf binBuffer = MessageUtils.buildBinMsgRspPackage("", sequenceId);

Review Comment:
   Suggest to add `dpIp` attribute here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] healchow commented on a diff in pull request #6325: [INLONG-6324][DataProxy] Optimize DataProxy's message processing on the source side

Posted by GitBox <gi...@apache.org>.
healchow commented on code in PR #6325:
URL: https://github.com/apache/inlong/pull/6325#discussion_r1008809450


##########
inlong-common/src/main/java/org/apache/inlong/common/enums/DataProxyErrCode.java:
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.common.enums;
+
+public enum DataProxyErrCode {
+
+    ERR_CODE_SUCCESS(0, "Ok"),

Review Comment:
   Personally, I don't think it is necessary to add the `ERR_CODE` prefix, because the enumeration class name already indicates that all its instances are about ERROR CODE.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org