You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by GitBox <gi...@apache.org> on 2022/10/31 02:31:22 UTC
[GitHub] [inlong] EMsnap commented on a diff in pull request #6325: [INLONG-6324][DataProxy] Optimize DataProxy's message processing on the source side
EMsnap commented on code in PR #6325:
URL: https://github.com/apache/inlong/pull/6325#discussion_r1008984868
##########
inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/MessageUtils.java:
##########
@@ -56,33 +63,193 @@ public static boolean isSyncSendForOrder(Event event) {
}
/**
- * Convert String to ByteBuf
+ * process response to client
+ * @param commonAttrMap attribute map
+ * @param resultMap result map
+ * @param remoteChannel client channel
+ * @param msgType the message type
+ */
+ public static void returnSourceRspPackage(Map<String, String> commonAttrMap,
+ Map<String, Object> resultMap,
+ Channel remoteChannel,
+ MsgType msgType) throws Exception {
+ ByteBuf binBuffer;
+ String origAttrs = null;
+ final StringBuilder strBuff = new StringBuilder(512);
+ // build message bytes
+ if (MsgType.MSG_UNKNOWN.equals(msgType)) {
+ if (logCounter.shouldPrint()) {
+ logger.warn("Unknown msgType message from {}, discard it!", remoteChannel);
+ }
+ return;
+ }
+ if (MsgType.MSG_HEARTBEAT.equals(msgType)) {
+ binBuffer = buildHeartBeatMsgRspPackage();
+ } else {
+ // check whether return response message
+ String isAck = commonAttrMap.get(AttributeConstants.MESSAGE_IS_ACK);
+ if ("false".equalsIgnoreCase(isAck)) {
+ return;
+ }
+ origAttrs = (String) resultMap.get(ConfigConstants.DECODER_ATTRS);
+ // check whether channel is writable
+ if (!remoteChannel.isWritable()) {
+ strBuff.append("Send buffer is full1, so disconnect ")
+ .append(remoteChannel).append(", attr is ").append(origAttrs);
+ if (logCounter.shouldPrint()) {
+ logger.warn(strBuff.toString());
+ }
+ throw new Exception(strBuff.toString());
+ }
+ // build return attribute string
+ String errCode = commonAttrMap.get(AttributeConstants.MESSAGE_PROCESS_ERRCODE);
+ String errMsg = commonAttrMap.get(AttributeConstants.MESSAGE_PROCESS_ERRMSG);
+ strBuff.append(ConfigConstants.DATAPROXY_IP_KEY)
+ .append(AttributeConstants.KEY_VALUE_SEPARATOR).append(getLocalIp());
+ if (StringUtils.isNotEmpty(errCode)) {
+ strBuff.append(AttributeConstants.SEPARATOR)
+ .append(AttributeConstants.MESSAGE_PROCESS_ERRCODE)
+ .append(AttributeConstants.KEY_VALUE_SEPARATOR).append(errCode);
+ }
+ if (StringUtils.isNotEmpty(errMsg)) {
+ strBuff.append(AttributeConstants.SEPARATOR)
+ .append(AttributeConstants.MESSAGE_PROCESS_ERRMSG)
+ .append(AttributeConstants.KEY_VALUE_SEPARATOR).append(errMsg);
+ }
+ if (StringUtils.isNotEmpty(origAttrs)) {
+ strBuff.append(AttributeConstants.SEPARATOR).append(origAttrs);
+ }
+ String destAttrs = strBuff.toString();
+ // build response message bytes
+ if (MsgType.MSG_BIN_MULTI_BODY.equals(msgType)) {
+ binBuffer = buildBinMsgRspPackage(destAttrs,
+ commonAttrMap.get(AttributeConstants.UNIQ_ID));
+ } else if (MsgType.MSG_BIN_HEARTBEAT.equals(msgType)) {
+ binBuffer = buildHBRspPackage(destAttrs,
+ (Byte)resultMap.get(ConfigConstants.VERSION_TYPE), 0);
+ } else {
+ // MsgType.MSG_ACK_SERVICE.equals(msgType)
+ // MsgType.MSG_ORIGINAL_RETURN.equals(msgType)
+ // MsgType.MSG_MULTI_BODY.equals(msgType)
+ // MsgType.MSG_MULTI_BODY_ATTR.equals(msgType)
+ binBuffer = buildDefMsgRspPackage(msgType, destAttrs);
+ }
+ }
+ // send response message
+ if (remoteChannel.isWritable()) {
+ remoteChannel.writeAndFlush(binBuffer);
+ } else {
+ // release allocated ByteBuf
+ binBuffer.release();
+ strBuff.delete(0, strBuff.length());
+ strBuff.append("Send buffer is full2, so disconnect ")
+ .append(remoteChannel).append(", attr is ").append(origAttrs);
+ if (logCounter.shouldPrint()) {
+ logger.warn(strBuff.toString());
+ }
+ throw new Exception(strBuff.toString());
+ }
+ }
+
+ /**
+ * Build hearbeat(1)-msg response message ByteBuf
+ *
+ * @return ByteBuf
+ */
+ private static ByteBuf buildHeartBeatMsgRspPackage() {
+ ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(5);
+ buffer.writeBytes(new byte[]{0, 0, 0, 1, 1});
+ return buffer;
+ }
+
+ /**
+ * Build default-msg response message ByteBuf
*
- * @param backattrs
- * @param msgType message type
+ * @param msgType the message type
+ * @param attrs the return attribute
+ * @return ByteBuf
+ */
+ private static ByteBuf buildDefMsgRspPackage(MsgType msgType, String attrs) {
+ int attrsLen = 0;
+ int bodyLen = 0;
+ if (attrs != null) {
+ attrsLen = attrs.length();
+ }
+ int backTotalLen = 1 + 4 + bodyLen + 4 + attrsLen;
+ ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(4 + backTotalLen);
+ buffer.writeInt(backTotalLen);
+ buffer.writeByte(msgType.getValue());
+ buffer.writeInt(bodyLen);
+ buffer.writeInt(attrsLen);
+ if (attrsLen > 0) {
+ buffer.writeBytes(attrs.getBytes(StandardCharsets.UTF_8));
+ }
+ return buffer;
+ }
+
+ /**
+ * Build bin-msg response message ByteBuf
+ *
+ * @param attrs the return attribute
* @param sequenceId sequence Id
* @return ByteBuf
*/
- public static ByteBuf getResponsePackage(String backattrs, MsgType msgType, String sequenceId) {
+ public static ByteBuf buildBinMsgRspPackage(String attrs, String sequenceId) {
+ // calculate total length
int binTotalLen = 1 + 4 + 2 + 2;
- if (null != backattrs) {
- binTotalLen += backattrs.length();
+ if (null != attrs) {
+ binTotalLen += attrs.length();
}
+ // allocate buffer and write fields
ByteBuf binBuffer = ByteBufAllocator.DEFAULT.buffer(4 + binTotalLen);
binBuffer.writeInt(binTotalLen);
- binBuffer.writeByte(msgType.getValue());
-
+ binBuffer.writeByte(MsgType.MSG_BIN_MULTI_BODY.getValue());
long uniqVal = Long.parseLong(sequenceId);
byte[] uniq = new byte[4];
uniq[0] = (byte) ((uniqVal >> 24) & 0xFF);
uniq[1] = (byte) ((uniqVal >> 16) & 0xFF);
uniq[2] = (byte) ((uniqVal >> 8) & 0xFF);
uniq[3] = (byte) (uniqVal & 0xFF);
binBuffer.writeBytes(uniq);
+ if (null != attrs) {
+ binBuffer.writeShort(attrs.length());
+ binBuffer.writeBytes(attrs.getBytes(StandardCharsets.UTF_8));
+ } else {
+ binBuffer.writeShort(0x0);
+ }
+ binBuffer.writeShort(0xee01);
+ return binBuffer;
+ }
- if (null != backattrs) {
- binBuffer.writeShort(backattrs.length());
- binBuffer.writeBytes(backattrs.getBytes(StandardCharsets.UTF_8));
+ /**
+ * Build heartbeat response message ByteBuf
+ *
+ * @param attrs the attribute string
+ * @param version the version
+ * @param loadValue the node load value
+ * @return ByteBuf
+ */
+ private static ByteBuf buildHBRspPackage(String attrs, byte version, int loadValue) {
+ // calculate total length
+ int binTotalLen = 1 + 4 + 1 + 4 + 2 + 2 + 2;
Review Comment:
guess it's nessasary to explain the numbers for different part here
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org