You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2022/09/26 11:04:25 UTC
[inlong] branch release-1.3.0 updated: [INLONG-5915][DataProxy] Optimize source's ServerMessageHandler class (#5916)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/release-1.3.0 by this push:
new 07e429e8f [INLONG-5915][DataProxy] Optimize source's ServerMessageHandler class (#5916)
07e429e8f is described below
commit 07e429e8f0d5c99d3779eedf67afc779a58bbb2a
Author: Goson Zhang <46...@qq.com>
AuthorDate: Fri Sep 16 17:13:21 2022 +0800
[INLONG-5915][DataProxy] Optimize source's ServerMessageHandler class (#5916)
---
.../dataproxy/http/SimpleMessageHandler.java | 1 -
.../dataproxy/source/DefaultServiceDecoder.java | 253 ++++++------
.../dataproxy/source/ServerMessageHandler.java | 436 +++++++++------------
.../inlong/dataproxy/source/ServiceDecoder.java | 11 +-
.../dataproxy/source/SimpleMessageHandler.java | 6 +-
5 files changed, 313 insertions(+), 394 deletions(-)
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java
index 3f1bf425c..0689039e1 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java
@@ -153,7 +153,6 @@ public class SimpleMessageHandler implements MessageHandler {
headers.put(ConfigConstants.REMOTE_IDC_KEY, DEFAULT_REMOTE_IDC_VALUE);
headers.put(ConfigConstants.MSG_COUNTER_KEY, strMsgCount);
byte[] data = inLongMsg.buildArray();
- headers.put(ConfigConstants.TOTAL_LEN, String.valueOf(data.length));
headers.put(AttributeConstants.RCV_TIME, String.valueOf(msgRcvTime));
Event event = EventBuilder.withBody(data, headers);
inLongMsg.reset();
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/DefaultServiceDecoder.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/DefaultServiceDecoder.java
index 355dc7a2e..11ac95e4e 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/DefaultServiceDecoder.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/DefaultServiceDecoder.java
@@ -18,28 +18,27 @@
package org.apache.inlong.dataproxy.source;
import com.google.common.base.Splitter;
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.math.NumberUtils;
import org.apache.inlong.dataproxy.base.ProxyMessage;
import org.apache.inlong.dataproxy.consts.AttributeConstants;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.apache.inlong.dataproxy.exception.ErrorCode;
import org.apache.inlong.dataproxy.exception.MessageIDException;
-import org.apache.inlong.dataproxy.utils.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xerial.snappy.Snappy;
-import java.io.IOException;
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
public class DefaultServiceDecoder implements ServiceDecoder {
private static final int BIN_MSG_TOTALLEN_OFFSET = 0;
@@ -109,33 +108,19 @@ public class DefaultServiceDecoder implements ServiceDecoder {
return resultMap;
}
- private void handleDateTime(Map<String, String> commonAttrMap, Channel channel,
- long uniq, long dataTime, int msgCount) {
+ private void handleDateTime(Map<String, String> commonAttrMap, long uniq,
+ long dataTime, int msgCount, String strRemoteIP,
+ long msgRcvTime) {
commonAttrMap.put(AttributeConstants.UNIQ_ID, String.valueOf(uniq));
- String time = "";
- if (commonAttrMap.containsKey(Constants.HEADER_KEY_MSG_TIME)) {
- time = commonAttrMap.get(Constants.HEADER_KEY_MSG_TIME);
- } else {
- time = String.valueOf(dataTime);
- }
- StringBuilder sidBuilder = new StringBuilder();
- /*
- * udp need use msgEvent get remote address
- */
- String remoteAddress = "";
- if (channel != null && channel.remoteAddress() != null) {
- remoteAddress = channel.remoteAddress().toString();
- }
- sidBuilder.append(remoteAddress).append("#").append(time)
- .append("#").append(uniq);
- commonAttrMap.put(AttributeConstants.SEQUENCE_ID, new String(sidBuilder));
-
- // datetime from sdk
+ String time = String.valueOf(dataTime);
+ commonAttrMap.put(AttributeConstants.SEQUENCE_ID,
+ new StringBuilder(256).append(strRemoteIP)
+ .append("#").append(time).append("#").append(uniq).toString());
+ // dt from sdk
commonAttrMap.put(AttributeConstants.DATA_TIME, String.valueOf(dataTime));
commonAttrMap
- .put(AttributeConstants.RCV_TIME, String.valueOf(System.currentTimeMillis()));
- commonAttrMap.put(AttributeConstants.MESSAGE_COUNT,
- String.valueOf(msgCount != 0 ? msgCount : 1));
+ .put(AttributeConstants.RCV_TIME, String.valueOf(msgRcvTime));
+ commonAttrMap.put(AttributeConstants.MESSAGE_COUNT, String.valueOf(msgCount));
}
private boolean handleExtMap(Map<String, String> commonAttrMap, ByteBuf cb,
@@ -165,18 +150,18 @@ public class DefaultServiceDecoder implements ServiceDecoder {
}
private ByteBuffer handleTrace(Channel channel, ByteBuf cb, int extendField,
- int msgHeadPos, int totalDataLen, int attrLen, String strAttr, int bodyLen) {
+ int msgHeadPos, int totalDataLen, int attrLen,
+ String strAttr, int bodyLen, long msgRcvTime) {
// whether enable trace
- boolean enableTrace = (((extendField & 0x2) >> 1) == 0x1);
ByteBuffer dataBuf;
+ boolean enableTrace = (((extendField & 0x2) >> 1) == 0x1);
if (!enableTrace) {
dataBuf = ByteBuffer.allocate(totalDataLen + BIN_MSG_TOTALLEN_SIZE);
cb.getBytes(msgHeadPos, dataBuf.array(), 0,
totalDataLen + BIN_MSG_TOTALLEN_SIZE);
} else {
- String traceInfo;
+ // get local address
String strNode2Ip = null;
-
SocketAddress loacalSockAddr = channel.localAddress();
if (null != loacalSockAddr) {
strNode2Ip = loacalSockAddr.toString();
@@ -187,11 +172,9 @@ public class DefaultServiceDecoder implements ServiceDecoder {
strNode2Ip, loacalSockAddr);
}
}
-
- traceInfo = "node2ip=" + strNode2Ip + "&rtime2=" + System.currentTimeMillis();
-
+ // build trace information
int newTotalLen = 0;
-
+ String traceInfo = "node2ip=" + strNode2Ip + "&rtime2=" + msgRcvTime;
if (attrLen != 0) {
newTotalLen = totalDataLen + traceInfo.length() + "&".length();
strAttr = strAttr + "&" + traceInfo;
@@ -199,7 +182,7 @@ public class DefaultServiceDecoder implements ServiceDecoder {
newTotalLen = totalDataLen + traceInfo.length();
strAttr = traceInfo;
}
-
+ // build trace information bytes
dataBuf = ByteBuffer.allocate(newTotalLen + BIN_MSG_TOTALLEN_SIZE);
cb.getBytes(msgHeadPos, dataBuf.array(), 0,
bodyLen + (BIN_MSG_FORMAT_SIZE - BIN_MSG_ATTRLEN_SIZE
@@ -207,11 +190,9 @@ public class DefaultServiceDecoder implements ServiceDecoder {
dataBuf.putShort(
bodyLen + (BIN_MSG_FORMAT_SIZE - BIN_MSG_ATTRLEN_SIZE - BIN_MSG_MAGIC_SIZE),
(short) strAttr.length());
-
System.arraycopy(strAttr.getBytes(StandardCharsets.UTF_8), 0, dataBuf.array(),
bodyLen + (BIN_MSG_FORMAT_SIZE - BIN_MSG_MAGIC_SIZE),
strAttr.length());
-
dataBuf.putInt(0, newTotalLen);
dataBuf.putShort(newTotalLen + BIN_MSG_TOTALLEN_SIZE - BIN_MSG_MAGIC_SIZE,
(short) 0xee01);
@@ -223,57 +204,51 @@ public class DefaultServiceDecoder implements ServiceDecoder {
* extract bin data, message type is 7
*/
private Map<String, Object> extractNewBinData(Map<String, Object> resultMap,
- ByteBuf cb, Channel channel,
- int totalDataLen, MsgType msgType) throws Exception {
+ ByteBuf cb, Channel channel,
+ int totalDataLen, MsgType msgType,
+ String strRemoteIP,
+ long msgRcvTime) throws Exception {
int msgHeadPos = cb.readerIndex() - 5;
-
+ // get body length
int bodyLen = cb.getInt(msgHeadPos + BIN_MSG_BODYLEN_OFFSET);
+ if (bodyLen == 0) {
+ throw new Exception(
+ "Error msg, bodyLen is empty; connection info:" + strRemoteIP);
+ }
+ // get attribute length
int attrLen = cb.getShort(msgHeadPos + BIN_MSG_BODY_OFFSET + bodyLen);
+ // get msg magic
int msgMagic = cb.getUnsignedShort(msgHeadPos + BIN_MSG_BODY_OFFSET
+ bodyLen + BIN_MSG_ATTRLEN_SIZE + attrLen);
-
- if (bodyLen == 0) {
- throw new Exception(new Throwable("err msg, bodyLen is empty"
- + ";Connection info:" + channel.toString()));
- }
-
- if ((totalDataLen + BIN_MSG_TOTALLEN_SIZE < (bodyLen + attrLen + BIN_MSG_FORMAT_SIZE))
- || (msgMagic != BIN_MSG_MAGIC)) {
- throw new Exception(new Throwable(
- "err msg, bodyLen + attrLen > totalDataLen,or msgMagic is valid! and bodyLen="
- + bodyLen + ",totalDataLen=" + totalDataLen + ",attrLen=" + attrLen
+ if ((msgMagic != BIN_MSG_MAGIC)
+ || (totalDataLen + BIN_MSG_TOTALLEN_SIZE < (bodyLen + attrLen + BIN_MSG_FORMAT_SIZE))) {
+ throw new Exception(
+ "Error msg, bodyLen + attrLen > totalDataLen,or msgMagic is valid! and bodyLen="
+ + bodyLen + ",attrLen=" + attrLen + ",totalDataLen=" + totalDataLen
+ ";magic=" + Integer.toHexString(msgMagic)
- + ";Connection info:" + channel.toString()));
+ + "; connection info:" + strRemoteIP);
}
-
+ // read data from ByteBuf
int groupIdNum = cb.readUnsignedShort();
int streamIdNum = cb.readUnsignedShort();
final int extendField = cb.readUnsignedShort();
long dataTime = cb.readUnsignedInt();
+ dataTime = dataTime * 1000;
int msgCount = cb.readUnsignedShort();
+ msgCount = (msgCount != 0) ? msgCount : 1;
long uniq = cb.readUnsignedInt();
-
- dataTime = dataTime * 1000;
- Map<String, String> commonAttrMap = new HashMap<String, String>();
cb.skipBytes(BIN_MSG_BODYLEN_SIZE + bodyLen + BIN_MSG_ATTRLEN_SIZE);
- resultMap.put(ConfigConstants.COMMON_ATTR_MAP, commonAttrMap);
-
- resultMap.put(ConfigConstants.EXTRA_ATTR, ((extendField & 0x1) == 0x1) ? "true" : "false");
-
// read body data
byte[] bodyData = new byte[bodyLen];
cb.getBytes(msgHeadPos + BIN_MSG_BODY_OFFSET, bodyData, 0, bodyLen);
- resultMap.put(ConfigConstants.DECODER_BODY, bodyData);
-
// read attr and write to map.
String strAttr = null;
+ Map<String, String> commonAttrMap = new HashMap<>();
if (attrLen != 0) {
byte[] attrData = new byte[attrLen];
cb.readBytes(attrData, 0, attrLen);
strAttr = new String(attrData, StandardCharsets.UTF_8);
- LOG.debug("strAttr = {}, length = {}", strAttr, strAttr.length());
resultMap.put(ConfigConstants.DECODER_ATTRS, strAttr);
-
try {
commonAttrMap.putAll(mapSplitter.split(strAttr));
} catch (Exception e) {
@@ -281,26 +256,23 @@ public class DefaultServiceDecoder implements ServiceDecoder {
throw new MessageIDException(uniq,
ErrorCode.ATTR_ERROR,
new Throwable("[Parse Error]new six segment protocol ,attr is "
- + strAttr + " , channel info:" + channel.toString()));
+ + strAttr + " , channel info:" + strRemoteIP));
}
}
-
+ // build attributes
+ resultMap.put(ConfigConstants.COMMON_ATTR_MAP, commonAttrMap);
+ resultMap.put(ConfigConstants.EXTRA_ATTR, ((extendField & 0x1) == 0x1) ? "true" : "false");
+ resultMap.put(ConfigConstants.DECODER_BODY, bodyData);
try {
- handleDateTime(commonAttrMap, channel, uniq, dataTime, msgCount);
- final boolean index = handleExtMap(commonAttrMap, cb, resultMap, extendField, msgHeadPos);
+ // handle common attribute information
+ handleDateTime(commonAttrMap, uniq, dataTime, msgCount, strRemoteIP, msgRcvTime);
+ final boolean isIndexMsg =
+ handleExtMap(commonAttrMap, cb, resultMap, extendField, msgHeadPos);
ByteBuffer dataBuf = handleTrace(channel, cb, extendField, msgHeadPos,
- totalDataLen, attrLen, strAttr, bodyLen);
-
- String groupId = null;
- String streamId = null;
-
- if (commonAttrMap.containsKey(AttributeConstants.GROUP_ID)) {
- groupId = commonAttrMap.get(AttributeConstants.GROUP_ID);
- }
- if (commonAttrMap.containsKey(AttributeConstants.STREAM_ID)) {
- streamId = commonAttrMap.get(AttributeConstants.STREAM_ID);
- }
-
+ totalDataLen, attrLen, strAttr, bodyLen, msgRcvTime);
+ // Check if groupId and streamId are number-to-name
+ String groupId = commonAttrMap.get(AttributeConstants.GROUP_ID);
+ String streamId = commonAttrMap.get(AttributeConstants.STREAM_ID);
if ((groupId != null) && (streamId != null)) {
commonAttrMap.put(AttributeConstants.NUM2NAME, "FALSE");
dataBuf.putShort(BIN_MSG_EXTEND_OFFSET, (short) (extendField | 0x4));
@@ -312,15 +284,16 @@ public class DefaultServiceDecoder implements ServiceDecoder {
commonAttrMap.put(AttributeConstants.STREAMID_NUM, String.valueOf(streamIdNum));
}
}
-
- if (MsgType.MSG_BIN_MULTI_BODY.equals(msgType) && !index) {
- List<ProxyMessage> msgList = new ArrayList<>(1);
- msgList.add(new ProxyMessage(groupId, streamId, commonAttrMap, dataBuf.array()));
- resultMap.put(ConfigConstants.MSG_LIST, msgList);
- } else if (MsgType.MSG_BIN_MULTI_BODY.equals(msgType)) {
+ // build ProxyMessage
+ if (MsgType.MSG_BIN_MULTI_BODY.equals(msgType)) {
List<ProxyMessage> msgList = new ArrayList<>(1);
- msgList.add(new ProxyMessage(groupId, streamId, commonAttrMap,
- (byte[]) resultMap.get(ConfigConstants.FILE_BODY)));
+ if (isIndexMsg) {
+ msgList.add(new ProxyMessage(groupId, streamId, commonAttrMap,
+ (byte[]) resultMap.get(ConfigConstants.FILE_BODY)));
+ } else {
+ msgList.add(new ProxyMessage(groupId,
+ streamId, commonAttrMap, dataBuf.array()));
+ }
resultMap.put(ConfigConstants.MSG_LIST, msgList);
}
} catch (Exception ex) {
@@ -328,7 +301,6 @@ public class DefaultServiceDecoder implements ServiceDecoder {
cb.clear();
throw new MessageIDException(uniq, ErrorCode.OTHER_ERROR, ex.getCause());
}
-
return resultMap;
}
@@ -336,97 +308,97 @@ public class DefaultServiceDecoder implements ServiceDecoder {
* extract bin data, message type less than 7
*/
private Map<String, Object> extractDefaultData(Map<String, Object> resultMap,
- ByteBuf cb, Channel channel,
- int totalDataLen, MsgType msgType) throws Exception {
+ ByteBuf cb, int totalDataLen,
+ MsgType msgType, String strRemoteIP,
+ long msgRcvTime) throws Exception {
int bodyLen = cb.readInt();
if (bodyLen == 0) {
- throw new Exception(new Throwable("err msg, bodyLen is empty" + ";"
- + "Connection info:" + channel.toString()));
+ throw new Exception("Error msg: bodyLen is empty, connection info:" + strRemoteIP);
}
// if body len is bigger than totalDataLen - 5(bodyLen bytes + message type bytes),
// that means an invalid message, reject it.
if (bodyLen > totalDataLen - 5) {
- throw new Exception(new Throwable("err msg, firstLen > totalDataLen, and bodyLen="
+ throw new Exception("Error msg, firstLen > totalDataLen, and bodyLen="
+ bodyLen + ",totalDataLen=" + totalDataLen
- + ";Connection info:" + channel.toString()));
+ + ", connection info:" + strRemoteIP);
}
-
// extract body bytes
byte[] bodyData = new byte[bodyLen];
cb.readBytes(bodyData, 0, bodyLen);
resultMap.put(ConfigConstants.DECODER_BODY, bodyData);
-
+ // extract attribute
int attrLen = cb.readInt();
// 9 means bodyLen bytes(4) + message type bytes(1) + attrLen bytes(4)
if (totalDataLen != 9 + attrLen + bodyLen) {
- throw new Exception(new Throwable(
- "err msg, totalDataLen != 9 + bodyLen + attrLen,and bodyLen=" + bodyLen
+ throw new Exception(
+ "Error msg, totalDataLen != 9 + bodyLen + attrLen,and bodyLen=" + bodyLen
+ ",totalDataLen=" + totalDataLen + ",attrDataLen=" + attrLen
- + ";Connection info:" + channel.toString()));
+ + ", connection info:" + strRemoteIP);
}
-
// extract attr bytes
byte[] attrData = new byte[attrLen];
cb.readBytes(attrData, 0, attrLen);
- String strAttr = new String(attrData, StandardCharsets.UTF_8);
- resultMap.put(ConfigConstants.DECODER_ATTRS, strAttr);
-
// convert attr bytes to map
Map<String, String> commonAttrMap;
+ String strAttr = new String(attrData, StandardCharsets.UTF_8);
try {
commonAttrMap = new HashMap<>(mapSplitter.split(strAttr));
} catch (Exception e) {
- throw new Exception(new Throwable("Parse commonAttrMap error.commonAttrString is: "
- + strAttr + " ,channel is :" + channel.toString()));
+ throw new Exception("Parse commonAttrMap error.commonAttrString is: "
+ + strAttr + " , connection info:" + strRemoteIP);
}
+ resultMap.put(ConfigConstants.DECODER_ATTRS, strAttr);
resultMap.put(ConfigConstants.COMMON_ATTR_MAP, commonAttrMap);
-
// decompress body data if compress type exists.
String compressType = commonAttrMap.get(AttributeConstants.COMPRESS_TYPE);
- resultMap.put(ConfigConstants.COMPRESS_TYPE, compressType);
if (StringUtils.isNotBlank(compressType)) {
+ resultMap.put(ConfigConstants.COMPRESS_TYPE, compressType);
byte[] unCompressedData = processUnCompress(bodyData, compressType);
if (unCompressedData == null || unCompressedData.length == 0) {
- throw new Exception(new Throwable("Uncompressed data error!compress type:"
- + compressType + ";data:" + new String(bodyData, StandardCharsets.UTF_8)
- + ";attr:" + strAttr + ";channel:" + channel.toString()));
+ throw new Exception("Uncompressed data error! compress type:"
+ + compressType + ";attr:" + strAttr
+ + " , connection info:" + strRemoteIP);
}
bodyData = unCompressedData;
}
-
// fill up attr map with some keys.
- commonAttrMap.put(AttributeConstants.RCV_TIME, String.valueOf(System.currentTimeMillis()));
String groupId = commonAttrMap.get(AttributeConstants.GROUP_ID);
String streamId = commonAttrMap.get(AttributeConstants.STREAM_ID);
-
- // add message count attr
- String cntStr = commonAttrMap.get(AttributeConstants.MESSAGE_COUNT);
- int msgCnt = cntStr != null ? Integer.parseInt(cntStr) : 1;
- commonAttrMap.put(AttributeConstants.MESSAGE_COUNT, String.valueOf(msgCnt));
-
+ String strDataTime = commonAttrMap.get(AttributeConstants.DATA_TIME);
+ long longDataTime = NumberUtils.toLong(strDataTime, msgRcvTime);
+ commonAttrMap.put(AttributeConstants.DATA_TIME, String.valueOf(longDataTime));
+ commonAttrMap.put(AttributeConstants.RCV_TIME, String.valueOf(msgRcvTime));
+ // check message count attr
+ String strMsgCnt = commonAttrMap.get(AttributeConstants.MESSAGE_COUNT);
+ int intMsgCnt = NumberUtils.toInt(strMsgCnt, 1);
+ commonAttrMap.put(AttributeConstants.MESSAGE_COUNT, String.valueOf(intMsgCnt));
// extract data from bodyData and if message type is 5, convert data into list.
+ int calCnt = 0;
List<ProxyMessage> msgList = null;
ByteBuffer bodyBuffer = ByteBuffer.wrap(bodyData);
if (MsgType.MSG_MULTI_BODY.equals(msgType)) {
- msgList = new ArrayList<>(msgCnt);
+ msgList = new ArrayList<>(intMsgCnt);
while (bodyBuffer.remaining() > 0) {
int singleMsgLen = bodyBuffer.getInt();
if (singleMsgLen <= 0 || singleMsgLen > bodyBuffer.remaining()) {
- throw new Exception(new Throwable("[Malformed Data]Invalid data len!channel is "
- + channel.toString()));
+ throw new Exception(
+ "[Malformed Data]Invalid data len! channel is " + strRemoteIP);
}
byte[] record = new byte[singleMsgLen];
bodyBuffer.get(record);
-
ProxyMessage message = new ProxyMessage(groupId, streamId, commonAttrMap, record);
msgList.add(message);
+ calCnt++;
}
} else {
msgList = new ArrayList<>(1);
msgList.add(new ProxyMessage(groupId, streamId, commonAttrMap, bodyData));
+ calCnt++;
+ }
+ if (calCnt != intMsgCnt) {
+ commonAttrMap.put(AttributeConstants.MESSAGE_COUNT, String.valueOf(calCnt));
}
resultMap.put(ConfigConstants.MSG_LIST, msgList);
-
return resultMap;
}
@@ -452,7 +424,8 @@ public class DefaultServiceDecoder implements ServiceDecoder {
* +--------+--------+--------+----------------+--------+----------------+------------------------+
*/
@Override
- public Map<String, Object> extractData(ByteBuf cb, Channel channel) throws Exception {
+ public Map<String, Object> extractData(ByteBuf cb, String strRemoteIP,
+ long msgRcvTime, Channel channel) throws Exception {
Map<String, Object> resultMap = new HashMap<>();
if (null == cb) {
LOG.error("cb == null");
@@ -460,8 +433,8 @@ public class DefaultServiceDecoder implements ServiceDecoder {
}
int totalLen = cb.readableBytes();
if (ConfigConstants.MSG_MAX_LENGTH_BYTES < totalLen) {
- throw new Exception(new Throwable("err msg, ConfigConstants.MSG_MAX_LENGTH_BYTES "
- + "< totalLen, and totalLen=" + totalLen));
+ throw new Exception("Error msg, ConfigConstants.MSG_MAX_LENGTH_BYTES "
+ + "< totalLen, and totalLen=" + totalLen);
}
// save index, reset it if buffer is not satisfied.
cb.markReaderIndex();
@@ -481,14 +454,16 @@ public class DefaultServiceDecoder implements ServiceDecoder {
if (MsgType.MSG_BIN_HEARTBEAT.equals(msgType)) {
return extractNewBinHB(resultMap, cb, channel, totalDataLen);
}
-
+ // process data message
if (msgType.getValue() >= MsgType.MSG_BIN_MULTI_BODY.getValue()) {
resultMap.put(ConfigConstants.COMPRESS_TYPE, (compressType != 0) ? "snappy" : "");
- return extractNewBinData(resultMap, cb, channel, totalDataLen, msgType);
+ return extractNewBinData(resultMap, cb,
+ channel, totalDataLen, msgType,
+ strRemoteIP, msgRcvTime);
} else {
- return extractDefaultData(resultMap, cb, channel, totalDataLen, msgType);
+ return extractDefaultData(resultMap, cb,
+ totalDataLen, msgType, strRemoteIP, msgRcvTime);
}
-
} else {
// reset index.
cb.resetReaderIndex();
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
index 9d3b32c4e..ab9055ad6 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
@@ -17,8 +17,20 @@
package org.apache.inlong.dataproxy.source;
+import static org.apache.inlong.dataproxy.consts.AttributeConstants.SEPARATOR;
+import static org.apache.inlong.dataproxy.source.SimpleTcpSource.blacklist;
+
import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
@@ -26,7 +38,6 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.group.ChannelGroup;
import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.math.NumberUtils;
import org.apache.flume.ChannelException;
import org.apache.flume.Event;
import org.apache.flume.channel.ChannelProcessor;
@@ -40,37 +51,16 @@ import org.apache.inlong.dataproxy.base.ProxyMessage;
import org.apache.inlong.dataproxy.config.ConfigManager;
import org.apache.inlong.dataproxy.consts.AttributeConstants;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
-import org.apache.inlong.dataproxy.exception.ErrorCode;
import org.apache.inlong.dataproxy.exception.MessageIDException;
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
-import org.apache.inlong.dataproxy.utils.Constants;
+import org.apache.inlong.dataproxy.utils.DateTimeUtils;
import org.apache.inlong.dataproxy.utils.MessageUtils;
import org.apache.inlong.dataproxy.utils.NetworkUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.text.SimpleDateFormat;
-import java.time.Instant;
-import java.time.LocalDateTime;
-import java.time.ZoneId;
-import java.time.format.DateTimeFormatter;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.apache.inlong.dataproxy.consts.AttributeConstants.SEPARATOR;
-import static org.apache.inlong.dataproxy.consts.ConfigConstants.SLA_METRIC_DATA;
-import static org.apache.inlong.dataproxy.consts.ConfigConstants.SLA_METRIC_GROUPID;
-import static org.apache.inlong.dataproxy.source.SimpleTcpSource.blacklist;
-
/**
* Server message handler
*
@@ -91,10 +81,6 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
.on(AttributeConstants.SEPARATOR)
.trimResults().withKeyValueSeparator(AttributeConstants.KEY_VALUE_SEPARATOR);
- private static final DateTimeFormatter DATE_FORMATTER
- = DateTimeFormatter.ofPattern("yyyyMMddHHmm");
- private static final ZoneId defZoneId = ZoneId.systemDefault();
-
private AbstractSource source;
private final ChannelGroup allChannels;
@@ -180,8 +166,7 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
strRemoteIp = strRemoteIp.substring(1, strRemoteIp.indexOf(':'));
} catch (Exception ee) {
logger.warn("fail to get the remote IP, and strIP={},remoteSocketAddress={}",
- strRemoteIp,
- remoteSocketAddress);
+ strRemoteIp, remoteSocketAddress);
}
}
return strRemoteIp;
@@ -269,10 +254,11 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
}
private void checkGroupIdInfo(ProxyMessage message, Map<String, String> commonAttrMap,
- Map<String, String> attrMap, AtomicReference<String> topicInfo) {
+ Map<String, String> attrMap, AtomicReference<String> topicInfo) {
String groupId = message.getGroupId();
String streamId = message.getStreamId();
if (null != groupId) {
+ // get configured group Id
String from = commonAttrMap.get(AttributeConstants.FROM);
if ("dc".equals(from)) {
String dcInterfaceId = message.getStreamId();
@@ -284,14 +270,15 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
message.setGroupId(groupId);
}
}
-
- String value = MessageUtils.getTopic(configManager.getTopicProperties(), groupId,
- streamId);
- if (StringUtils.isNotEmpty(value)) {
- topicInfo.set(value.trim());
+ // get configured topic name
+ String configTopic = MessageUtils.getTopic(
+ configManager.getTopicProperties(), groupId, streamId);
+ if (StringUtils.isNotEmpty(configTopic)) {
+ topicInfo.set(configTopic.trim());
}
-
- Map<String, String> mxValue = configManager.getMxPropertiesMaps().get(groupId);
+ // get configured m value
+ Map<String, String> mxValue =
+ configManager.getMxPropertiesMaps().get(groupId);
if (mxValue != null && mxValue.size() != 0) {
message.getAttributeMap().putAll(mxValue);
} else {
@@ -301,7 +288,7 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
String num2name = commonAttrMap.get(AttributeConstants.NUM2NAME);
String groupIdNum = commonAttrMap.get(AttributeConstants.GROUPID_NUM);
String streamIdNum = commonAttrMap.get(AttributeConstants.STREAMID_NUM);
-
+ // get configured groupId and steamId by numbers
if (configManager.getGroupIdMappingProperties() != null
&& configManager.getStreamIdMappingProperties() != null) {
groupId = configManager.getGroupIdMappingProperties().get(groupIdNum);
@@ -311,21 +298,21 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
String enableTrans =
(configManager.getGroupIdEnableMappingProperties() == null)
? null : configManager.getGroupIdEnableMappingProperties().get(groupIdNum);
- if (("TRUE".equalsIgnoreCase(enableTrans) && "TRUE"
- .equalsIgnoreCase(num2name))) {
+ if (("TRUE".equalsIgnoreCase(enableTrans)
+ && "TRUE".equalsIgnoreCase(num2name))) {
String extraAttr = "groupId=" + groupId + "&" + "streamId=" + streamId;
message.setData(newBinMsg(message.getData(), extraAttr));
}
-
+ // reset groupId and streamId to message and attrMap
attrMap.put(AttributeConstants.GROUP_ID, groupId);
attrMap.put(AttributeConstants.STREAM_ID, streamId);
message.setGroupId(groupId);
message.setStreamId(streamId);
-
- String value = MessageUtils.getTopic(configManager.getTopicProperties(),
- groupId, streamId);
- if (StringUtils.isNotEmpty(value)) {
- topicInfo.set(value.trim());
+ // get configured topic name
+ String configTopic = MessageUtils.getTopic(
+ configManager.getTopicProperties(), groupId, streamId);
+ if (StringUtils.isNotEmpty(configTopic)) {
+ topicInfo.set(configTopic.trim());
}
}
}
@@ -333,60 +320,28 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
}
private boolean updateMsgList(List<ProxyMessage> msgList, Map<String, String> commonAttrMap,
- Map<String, HashMap<String, List<ProxyMessage>>> messageMap,
- String strRemoteIP, MsgType msgType) {
+ Map<String, HashMap<String, List<ProxyMessage>>> messageMap,
+ String strRemoteIP) {
for (ProxyMessage message : msgList) {
- Map<String, String> attrMap = message.getAttributeMap();
-
String topic = this.defaultTopic;
-
+ Map<String, String> attrMap = message.getAttributeMap();
AtomicReference<String> topicInfo = new AtomicReference<>(topic);
checkGroupIdInfo(message, commonAttrMap, attrMap, topicInfo);
String groupId = message.getGroupId();
String streamId = message.getStreamId();
+ if (streamId == null) {
+ streamId = "";
+ message.setStreamId(streamId);
+ }
topic = topicInfo.get();
if (StringUtils.isEmpty(topic)) {
logger.warn("Topic for message is null , inlongGroupId = {}, inlongStreamId = {}",
groupId, streamId);
}
- // if(groupId==null)groupId="b_test";//default groupId
-
+ // append topic
message.setTopic(topic);
commonAttrMap.put(AttributeConstants.NODE_IP, strRemoteIP);
-
- // whether sla
- if (SLA_METRIC_GROUPID.equals(groupId)) {
- commonAttrMap.put(SLA_METRIC_DATA, "true");
- message.setTopic(SLA_METRIC_DATA);
- }
-
- if (groupId != null && streamId != null) {
- String tubeSwtichKey = groupId + SEPARATOR + streamId;
- if (configManager.getTubeSwitchProperties().get(tubeSwtichKey) != null
- && "false".equals(configManager.getTubeSwitchProperties()
- .get(tubeSwtichKey).trim())) {
- continue;
- }
- }
-
- if (!"pb".equals(attrMap.get(AttributeConstants.MESSAGE_TYPE))
- && !MsgType.MSG_MULTI_BODY.equals(msgType)
- && !MsgType.MSG_MULTI_BODY_ATTR.equals(msgType)) {
- byte[] data = message.getData();
- if (data[data.length - 1] == '\n') {
- int tripDataLen = data.length - 1;
- if (data[data.length - 2] == '\r') {
- tripDataLen = data.length - 2;
- }
- byte[] tripData = new byte[tripDataLen];
- System.arraycopy(data, 0, tripData, 0, tripDataLen);
- message.setData(tripData);
- }
- }
-
- if (streamId == null) {
- streamId = "";
- }
+ // add ProxyMessage
HashMap<String, List<ProxyMessage>> streamIdMsgMap = messageMap
.computeIfAbsent(topic, k -> new HashMap<>());
List<ProxyMessage> streamIdMsgList = streamIdMsgMap
@@ -397,8 +352,8 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
}
private void formatMessagesAndSend(ChannelHandlerContext ctx, Map<String, String> commonAttrMap,
- Map<String, HashMap<String, List<ProxyMessage>>> messageMap,
- String strRemoteIP, MsgType msgType) throws MessageIDException {
+ Map<String, HashMap<String, List<ProxyMessage>>> messageMap,
+ String strRemoteIP, MsgType msgType, long msgRcvTime) throws MessageIDException {
int inLongMsgVer = 1;
if (MsgType.MSG_MULTI_BODY_ATTR.equals(msgType)) {
@@ -406,129 +361,124 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
} else if (MsgType.MSG_BIN_MULTI_BODY.equals(msgType)) {
inLongMsgVer = 4;
}
+ StringBuilder strBuff = new StringBuilder(512);
int recordMsgCnt = Integer.parseInt(commonAttrMap.get(AttributeConstants.MESSAGE_COUNT));
-
+ // process each ProxyMessage
for (Map.Entry<String, HashMap<String, List<ProxyMessage>>> topicEntry : messageMap.entrySet()) {
for (Map.Entry<String, List<ProxyMessage>> streamIdEntry : topicEntry.getValue().entrySet()) {
-
+ // build InLongMsg
+ String groupId = null;
+ int streamMsgCnt = 0;
InLongMsg inLongMsg = InLongMsg.newInLongMsg(this.isCompressed, inLongMsgVer);
- Map<String, String> headers = new HashMap<String, String>();
- for (ProxyMessage message : streamIdEntry.getValue()) {
- if (MsgType.MSG_MULTI_BODY_ATTR.equals(msgType) || MsgType.MSG_MULTI_BODY.equals(msgType)) {
+ if (MsgType.MSG_MULTI_BODY_ATTR.equals(msgType) || MsgType.MSG_MULTI_BODY.equals(msgType)) {
+ for (ProxyMessage message : streamIdEntry.getValue()) {
+ if (StringUtils.isEmpty(groupId)) {
+ groupId = message.getGroupId();
+ }
+ streamMsgCnt++;
message.getAttributeMap().put(AttributeConstants.MESSAGE_COUNT, String.valueOf(1));
inLongMsg.addMsg(mapJoiner.join(message.getAttributeMap()), message.getData());
- } else if (MsgType.MSG_BIN_MULTI_BODY.equals(msgType)) {
+ }
+ } else if (MsgType.MSG_BIN_MULTI_BODY.equals(msgType)) {
+ for (ProxyMessage message : streamIdEntry.getValue()) {
+ if (StringUtils.isEmpty(groupId)) {
+ groupId = message.getGroupId();
+ }
+ streamMsgCnt++;
inLongMsg.addMsg(message.getData());
- } else {
+ }
+ } else {
+ for (ProxyMessage message : streamIdEntry.getValue()) {
+ if (StringUtils.isEmpty(groupId)) {
+ groupId = message.getGroupId();
+ }
+ streamMsgCnt++;
inLongMsg.addMsg(mapJoiner.join(message.getAttributeMap()), message.getData());
}
}
- // get msgTime
- long currTIme = System.currentTimeMillis();
- String strMsgTime = commonAttrMap.get(Constants.HEADER_KEY_MSG_TIME);
- long pkgTimeInMillis = NumberUtils.toLong(strMsgTime, currTIme);
- LocalDateTime localDateTime =
- LocalDateTime.ofInstant(Instant.ofEpochMilli(pkgTimeInMillis), defZoneId);
- String pkgTimeStr = DATE_FORMATTER.format(localDateTime);
- headers.put(Constants.HEADER_KEY_MSG_TIME, String.valueOf(pkgTimeInMillis));
- headers.put(ConfigConstants.PKG_TIME_KEY, pkgTimeStr);
- // get data time
- long dtTime = NumberUtils.toLong(
- commonAttrMap.get(AttributeConstants.DATA_TIME), currTIme);
- headers.put(AttributeConstants.DATA_TIME, String.valueOf(dtTime));
-
+ if (recordMsgCnt != streamMsgCnt) {
+ logger.debug("Found message count not equal, record={}, calculate value = {}",
+ recordMsgCnt, streamMsgCnt);
+ }
+ commonAttrMap.put(AttributeConstants.MESSAGE_COUNT, String.valueOf(streamMsgCnt));
+ // build headers
+ Map<String, String> headers = new HashMap<>();
+ headers.put(AttributeConstants.GROUP_ID, groupId);
+ headers.put(AttributeConstants.STREAM_ID, streamIdEntry.getKey());
+ headers.put(ConfigConstants.TOPIC_KEY, topicEntry.getKey());
+ String strDataTime = commonAttrMap.get(AttributeConstants.DATA_TIME);
+ headers.put(AttributeConstants.DATA_TIME, strDataTime);
+ headers.put(ConfigConstants.REMOTE_IP_KEY, strRemoteIP);
+ headers.put(ConfigConstants.REMOTE_IDC_KEY, DEFAULT_REMOTE_IDC_VALUE);
+ headers.put(ConfigConstants.MSG_COUNTER_KEY,
+ commonAttrMap.get(AttributeConstants.MESSAGE_COUNT));
+ headers.put(AttributeConstants.RCV_TIME,
+ commonAttrMap.get(AttributeConstants.RCV_TIME));
+ // add extra key-value information
+ headers.put(AttributeConstants.UNIQ_ID,
+ commonAttrMap.get(AttributeConstants.UNIQ_ID));
if ("false".equals(commonAttrMap.get(AttributeConstants.MESSAGE_IS_ACK))) {
headers.put(AttributeConstants.MESSAGE_IS_ACK, "false");
}
-
String syncSend = commonAttrMap.get(AttributeConstants.MESSAGE_SYNC_SEND);
if (StringUtils.isNotEmpty(syncSend)) {
headers.put(AttributeConstants.MESSAGE_SYNC_SEND, syncSend);
}
-
String partitionKey = commonAttrMap.get(AttributeConstants.MESSAGE_PARTITION_KEY);
if (StringUtils.isNotEmpty(partitionKey)) {
headers.put(AttributeConstants.MESSAGE_PARTITION_KEY, partitionKey);
}
-
- headers.put(ConfigConstants.TOPIC_KEY, topicEntry.getKey());
- headers.put(AttributeConstants.GROUP_ID,
- streamIdEntry.getValue().get(0).getGroupId());
- headers.put(AttributeConstants.STREAM_ID, streamIdEntry.getKey());
- headers.put(ConfigConstants.REMOTE_IP_KEY, strRemoteIP);
- headers.put(ConfigConstants.REMOTE_IDC_KEY, DEFAULT_REMOTE_IDC_VALUE);
- // every message share the same msg cnt? what if msgType = 5
- String proxyMetricMsgCnt = commonAttrMap.get(AttributeConstants.MESSAGE_COUNT);
- if (MsgType.MSG_MULTI_BODY_ATTR.equals(msgType) || MsgType.MSG_MULTI_BODY.equals(msgType)) {
- commonAttrMap.put(AttributeConstants.MESSAGE_COUNT, String.valueOf(recordMsgCnt));
- proxyMetricMsgCnt = commonAttrMap.get(AttributeConstants.MESSAGE_COUNT);
- }
- headers.put(ConfigConstants.MSG_COUNTER_KEY, proxyMetricMsgCnt);
-
- byte[] data = inLongMsg.buildArray();
- headers.put(ConfigConstants.TOTAL_LEN, String.valueOf(data.length));
-
- headers.put(AttributeConstants.UNIQ_ID,
- commonAttrMap.get(AttributeConstants.UNIQ_ID));
String sequenceId = commonAttrMap.get(AttributeConstants.SEQUENCE_ID);
if (StringUtils.isNotEmpty(sequenceId)) {
- StringBuilder sidBuilder = new StringBuilder();
- sidBuilder.append(topicEntry.getKey()).append(SEPARATOR).append(streamIdEntry.getKey())
+ strBuff.append(topicEntry.getKey()).append(SEPARATOR)
+ .append(streamIdEntry.getKey())
.append(SEPARATOR).append(sequenceId);
- headers.put(ConfigConstants.SEQUENCE_ID, sidBuilder.toString());
+ headers.put(ConfigConstants.SEQUENCE_ID, strBuff.toString());
+ strBuff.delete(0, strBuff.length());
}
+ final byte[] data = inLongMsg.buildArray();
Event event = EventBuilder.withBody(data, headers);
+ inLongMsg.reset();
+ // build metric data item
String orderType = "non-order";
if (MessageUtils.isSyncSendForOrder(event)) {
event = new OrderEvent(ctx, event);
orderType = "order";
}
- long dtten = 0;
- try {
- dtten = Long.parseLong(headers.get(AttributeConstants.DATA_TIME));
- } catch (Exception e1) {
- long uniqVal = Long.parseLong(commonAttrMap.get(AttributeConstants.UNIQ_ID));
- throw new MessageIDException(uniqVal,
- ErrorCode.DT_ERROR,
- new Throwable("attribute dt=" + headers.get(AttributeConstants.DATA_TIME
- + " has error, detail is: topic=" + topicEntry.getKey() + "&streamId="
- + streamIdEntry.getKey() + "&NodeIP=" + strRemoteIP), e1));
- }
-
- dtten = dtten / 1000 / 60 / 10;
- dtten = dtten * 1000 * 60 * 10;
- StringBuilder newbase = new StringBuilder();
- newbase.append(protocolType).append(SEPARATOR)
+ long longDataTime = Long.parseLong(strDataTime);
+ longDataTime = longDataTime / 1000 / 60 / 10;
+ longDataTime = longDataTime * 1000 * 60 * 10;
+ strBuff.append(protocolType).append(SEPARATOR)
.append(topicEntry.getKey()).append(SEPARATOR)
.append(streamIdEntry.getKey()).append(SEPARATOR)
.append(strRemoteIP).append(SEPARATOR)
.append(NetworkUtils.getLocalIp()).append(SEPARATOR)
.append(orderType).append(SEPARATOR)
- .append(new SimpleDateFormat("yyyyMMddHHmm")
- .format(dtten)).append(SEPARATOR).append(pkgTimeStr);
+ .append(DateTimeUtils.ms2yyyyMMddHHmm(longDataTime)).append(SEPARATOR)
+ .append(DateTimeUtils.ms2yyyyMMddHHmm(msgRcvTime));
try {
processor.processEvent(event);
monitorIndexExt.incrementAndGet("EVENT_SUCCESS");
this.addMetric(true, data.length, event);
- monitorIndex.addAndGet(new String(newbase),
- Integer.parseInt(proxyMetricMsgCnt), 1, data.length, 0);
+ monitorIndex.addAndGet(strBuff.toString(),
+ streamMsgCnt, 1, data.length, 0);
+ strBuff.delete(0, strBuff.length());
} catch (Throwable ex) {
logger.error("Error writting to channel,data will discard.", ex);
monitorIndexExt.incrementAndGet("EVENT_DROPPED");
- monitorIndex.addAndGet(new String(newbase), 0, 0, 0,
- Integer.parseInt(proxyMetricMsgCnt));
+ monitorIndex.addAndGet(strBuff.toString(), 0, 0, 0, streamMsgCnt);
this.addMetric(false, data.length, event);
+ strBuff.delete(0, strBuff.length());
throw new ChannelException("ProcessEvent error can't write event to channel.");
}
}
}
}
- private void responsePackage(ChannelHandlerContext ctx, Map<String, String> commonAttrMap,
- Map<String, Object> resultMap,
- Channel remoteChannel,
- SocketAddress remoteSocketAddress,
- MsgType msgType) throws Exception {
+ private void responsePackage(Map<String, String> commonAttrMap,
+ Map<String, Object> resultMap,
+ Channel remoteChannel,
+ MsgType msgType) throws Exception {
String isAck = commonAttrMap.get(AttributeConstants.MESSAGE_IS_ACK);
if (isAck == null || "true".equals(isAck)) {
if (MsgType.MSG_ACK_SERVICE.equals(msgType) || MsgType.MSG_ORIGINAL_RETURN
@@ -571,25 +521,22 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
}
}
} else if (MsgType.MSG_BIN_MULTI_BODY.equals(msgType)) {
- String backattrs = null;
- if (resultMap.containsKey(ConfigConstants.DECODER_ATTRS)) {
- backattrs = (String) resultMap.get(ConfigConstants.DECODER_ATTRS);
- }
+ String backAttrs = (String) resultMap.get(ConfigConstants.DECODER_ATTRS);
String uniqVal = commonAttrMap.get(AttributeConstants.UNIQ_ID);
- ByteBuf binBuffer = MessageUtils.getResponsePackage(backattrs, msgType, uniqVal);
+ ByteBuf binBuffer = MessageUtils.getResponsePackage(backAttrs, msgType, uniqVal);
if (remoteChannel.isWritable()) {
remoteChannel.writeAndFlush(binBuffer);
logger.debug("Connection info: {} ; attr is {} ; uniqVal {}",
- remoteChannel, backattrs, uniqVal);
+ remoteChannel, backAttrs, uniqVal);
} else {
binBuffer.release();
logger.warn(
"the send buffer2 is full, so disconnect it!please check remote client"
+ "; Connection info:" + remoteChannel + ";attr is "
- + backattrs);
+ + backAttrs);
throw new Exception(new Throwable(
"the send buffer2 is full,so disconnect it!please check remote client, Connection info:"
- + remoteChannel + ";attr is " + backattrs));
+ + remoteChannel + ";attr is " + backAttrs));
}
}
}
@@ -597,9 +544,8 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- logger.debug("message received");
if (msg == null) {
- logger.error("get null msg, just skip");
+ logger.error("Get null msg, just skip!");
this.addMetric(false, 0, null);
return;
}
@@ -609,26 +555,27 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
String strRemoteIP = getRemoteIp(remoteChannel);
int len = cb.readableBytes();
if (len == 0 && this.filterEmptyMsg) {
- logger.warn("skip empty msg.");
+ logger.warn("Get empty msg from {}, just skip!", strRemoteIP);
this.addMetric(false, 0, null);
return;
}
-
+ // parse message
Map<String, Object> resultMap = null;
+ final long msgRcvTime = System.currentTimeMillis();
try {
- resultMap = serviceDecoder.extractData(cb, remoteChannel);
+ resultMap = serviceDecoder.extractData(cb,
+ strRemoteIP, msgRcvTime, remoteChannel);
+ if (resultMap == null || resultMap.isEmpty()) {
+ logger.info("Parse message result is null, from {}", strRemoteIP);
+ this.addMetric(false, 0, null);
+ return;
+ }
} catch (MessageIDException ex) {
logger.error("MessageIDException ex = {}", ex);
this.addMetric(false, 0, null);
throw new IOException(ex.getCause());
}
-
- if (resultMap == null) {
- logger.info("result is null");
- this.addMetric(false, 0, null);
- return;
- }
-
+ // process message by msgType
MsgType msgType = (MsgType) resultMap.get(ConfigConstants.MSG_TYPE);
if (MsgType.MSG_HEARTBEAT.equals(msgType)) {
ByteBuf heartbeatBuffer = ByteBufAllocator.DEFAULT.buffer(5);
@@ -637,90 +584,83 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
this.addMetric(false, 0, null);
return;
}
-
+ // process heart beat 8
if (MsgType.MSG_BIN_HEARTBEAT.equals(msgType)) {
this.addMetric(false, 0, null);
return;
}
-
+ // process data message
Map<String, String> commonAttrMap =
(Map<String, String>) resultMap.get(ConfigConstants.COMMON_ATTR_MAP);
if (commonAttrMap == null) {
commonAttrMap = new HashMap<String, String>();
}
-
List<ProxyMessage> msgList = (List<ProxyMessage>) resultMap.get(ConfigConstants.MSG_LIST);
-
boolean checkMessageTopic = true;
- if (msgList != null
- && !commonAttrMap.containsKey(ConfigConstants.FILE_CHECK_DATA)
- && !commonAttrMap.containsKey(ConfigConstants.MINUTE_CHECK_DATA)) {
- Map<String, HashMap<String, List<ProxyMessage>>> messageMap =
- new HashMap<String, HashMap<String, List<ProxyMessage>>>(
- msgList.size());
-
- checkMessageTopic = updateMsgList(msgList, commonAttrMap, messageMap, strRemoteIP,
- msgType);
- if (checkMessageTopic) {
- formatMessagesAndSend(ctx, commonAttrMap, messageMap,
- strRemoteIP, msgType);
- }
- } else if (msgList != null && commonAttrMap.containsKey(ConfigConstants.FILE_CHECK_DATA)) {
- Map<String, String> headers = new HashMap<String, String>();
- headers.put("msgtype", "filestatus");
- headers.put(ConfigConstants.FILE_CHECK_DATA,
- "true");
- headers.put(AttributeConstants.UNIQ_ID,
- commonAttrMap.get(AttributeConstants.UNIQ_ID));
- for (ProxyMessage message : msgList) {
- byte[] body = message.getData();
- Event event = EventBuilder.withBody(body, headers);
- if (MessageUtils.isSyncSendForOrder(commonAttrMap
- .get(AttributeConstants.MESSAGE_SYNC_SEND))) {
- event = new OrderEvent(ctx, event);
- }
- try {
- processor.processEvent(event);
- this.addMetric(true, body.length, event);
- } catch (Throwable ex) {
- logger.error("Error writing to controller,data will discard.", ex);
- this.addMetric(false, body.length, event);
- throw new ChannelException(
- "Process Controller Event error can't write event to channel.");
+ if (msgList != null) {
+ if (commonAttrMap.containsKey(ConfigConstants.FILE_CHECK_DATA)) {
+ // process file check data
+ Map<String, String> headers = new HashMap<String, String>();
+ headers.put("msgtype", "filestatus");
+ headers.put(ConfigConstants.FILE_CHECK_DATA, "true");
+ headers.put(AttributeConstants.UNIQ_ID,
+ commonAttrMap.get(AttributeConstants.UNIQ_ID));
+ for (ProxyMessage message : msgList) {
+ byte[] body = message.getData();
+ Event event = EventBuilder.withBody(body, headers);
+ if (MessageUtils.isSyncSendForOrder(commonAttrMap
+ .get(AttributeConstants.MESSAGE_SYNC_SEND))) {
+ event = new OrderEvent(ctx, event);
+ }
+ try {
+ processor.processEvent(event);
+ this.addMetric(true, body.length, event);
+ } catch (Throwable ex) {
+ logger.error("Error writing to controller,data will discard.", ex);
+ this.addMetric(false, body.length, event);
+ throw new ChannelException(
+ "Process Controller Event error can't write event to channel.");
+ }
}
- }
- } else if (msgList != null && commonAttrMap
- .containsKey(ConfigConstants.MINUTE_CHECK_DATA)) {
- logger.info("i am in MINUTE_CHECK_DATA");
- Map<String, String> headers = new HashMap<String, String>();
- headers.put("msgtype", "measure");
- headers.put(ConfigConstants.FILE_CHECK_DATA,
- "true");
- headers.put(AttributeConstants.UNIQ_ID,
- commonAttrMap.get(AttributeConstants.UNIQ_ID));
- for (ProxyMessage message : msgList) {
- byte[] body = message.getData();
- Event event = EventBuilder.withBody(body, headers);
- if (MessageUtils.isSyncSendForOrder(commonAttrMap
- .get(AttributeConstants.MESSAGE_SYNC_SEND))) {
- event = new OrderEvent(ctx, event);
+ } else if (commonAttrMap.containsKey(ConfigConstants.MINUTE_CHECK_DATA)) {
+ // process minute check data
+ Map<String, String> headers = new HashMap<String, String>();
+ headers.put("msgtype", "measure");
+ headers.put(ConfigConstants.FILE_CHECK_DATA, "true");
+ headers.put(AttributeConstants.UNIQ_ID,
+ commonAttrMap.get(AttributeConstants.UNIQ_ID));
+ for (ProxyMessage message : msgList) {
+ byte[] body = message.getData();
+ Event event = EventBuilder.withBody(body, headers);
+ if (MessageUtils.isSyncSendForOrder(commonAttrMap
+ .get(AttributeConstants.MESSAGE_SYNC_SEND))) {
+ event = new OrderEvent(ctx, event);
+ }
+ try {
+ processor.processEvent(event);
+ this.addMetric(true, body.length, event);
+ } catch (Throwable ex) {
+ logger.error("Error writing to controller,data will discard.", ex);
+ this.addMetric(false, body.length, event);
+ throw new ChannelException(
+ "Process Controller Event error can't write event to channel.");
+ }
}
- try {
- processor.processEvent(event);
- this.addMetric(true, body.length, event);
- } catch (Throwable ex) {
- logger.error("Error writing to controller,data will discard.", ex);
- this.addMetric(false, body.length, event);
- throw new ChannelException(
- "Process Controller Event error can't write event to channel.");
+ } else {
+ // process message data
+ Map<String, HashMap<String, List<ProxyMessage>>> messageMap =
+ new HashMap<>(msgList.size());
+ checkMessageTopic = updateMsgList(msgList,
+ commonAttrMap, messageMap, strRemoteIP);
+ if (checkMessageTopic) {
+ formatMessagesAndSend(ctx, commonAttrMap,
+ messageMap, strRemoteIP, msgType, msgRcvTime);
}
}
}
- SocketAddress remoteSocketAddress = remoteChannel.remoteAddress();
if (!checkMessageTopic || !MessageUtils.isSyncSendForOrder(commonAttrMap
.get(AttributeConstants.MESSAGE_SYNC_SEND))) {
- responsePackage(ctx, commonAttrMap, resultMap, remoteChannel,
- remoteSocketAddress, msgType);
+ responsePackage(commonAttrMap, resultMap, remoteChannel, msgType);
}
} finally {
cb.release();
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServiceDecoder.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServiceDecoder.java
index 7f4852b5c..3f13f7e1d 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServiceDecoder.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServiceDecoder.java
@@ -28,10 +28,13 @@ public interface ServiceDecoder {
/**
* extract data from buffer and convert it into map.
*
- * @param cb
- * @param channel
- * @return Map
+ * @param cb the message Byte buffer
+ * @param strRemoteIP the remote ip message sent
+ * @param msgRcvTime the received message time
+ * @param channel the channel
+ * @return Map the message map
* @throws Exception
*/
- Map<String, Object> extractData(ByteBuf cb, Channel channel) throws Exception;
+ Map<String, Object> extractData(ByteBuf cb, String strRemoteIP,
+ long msgRcvTime, Channel channel) throws Exception;
}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java
index 38ca9ee32..71372da18 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java
@@ -570,6 +570,7 @@ public class SimpleMessageHandler extends ChannelInboundHandlerAdapter {
return;
}
Channel remoteChannel = ctx.channel();
+ String strRemoteIP = getRemoteIp(remoteChannel);
ByteBuf cb = (ByteBuf) msg;
try {
int len = cb.readableBytes();
@@ -580,8 +581,10 @@ public class SimpleMessageHandler extends ChannelInboundHandlerAdapter {
return;
}
Map<String, Object> resultMap = null;
+ final long msgRcvTime = System.currentTimeMillis();
try {
- resultMap = serviceProcessor.extractData(cb, remoteChannel);
+ resultMap = serviceProcessor.extractData(cb,
+ strRemoteIP, msgRcvTime, remoteChannel);
} catch (MessageIDException ex) {
this.addMetric(false, 0, null);
throw new IOException(ex.getCause());
@@ -617,7 +620,6 @@ public class SimpleMessageHandler extends ChannelInboundHandlerAdapter {
&& !commonAttrMap.containsKey(ConfigConstants.FILE_CHECK_DATA)
&& !commonAttrMap.containsKey(ConfigConstants.MINUTE_CHECK_DATA)) {
Map<String, HashMap<String, List<ProxyMessage>>> messageMap = new HashMap<>(msgList.size());
- String strRemoteIP = getRemoteIp(remoteChannel);
updateMsgList(msgList, commonAttrMap, messageMap, strRemoteIP, msgType);
formatMessagesAndSend(commonAttrMap, messageMap, strRemoteIP, msgType);