You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2022/10/31 05:12:30 UTC
[inlong] branch master updated: [INLONG-6324][DataProxy] Optimize DataProxy's message processing on the source side (#6325)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 6749c733d [INLONG-6324][DataProxy] Optimize DataProxy's message processing on the source side (#6325)
6749c733d is described below
commit 6749c733d7a1e22a7fc40b8fa2f775b0a8268fe0
Author: Goson Zhang <46...@qq.com>
AuthorDate: Mon Oct 31 13:12:25 2022 +0800
[INLONG-6324][DataProxy] Optimize DataProxy's message processing on the source side (#6325)
---
.../inlong/common/enums/DataProxyErrCode.java | 63 +++
.../dataproxy/consts/AttributeConstants.java | 4 +
.../inlong/dataproxy/consts/ConfigConstants.java | 2 +
.../dataproxy/sink/pulsar/PulsarClientService.java | 7 +-
.../dataproxy/source/ServerMessageHandler.java | 452 +++++++++------------
.../inlong/dataproxy/utils/MessageUtils.java | 199 ++++++++-
6 files changed, 441 insertions(+), 286 deletions(-)
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/enums/DataProxyErrCode.java b/inlong-common/src/main/java/org/apache/inlong/common/enums/DataProxyErrCode.java
new file mode 100644
index 000000000..4a08319f1
--- /dev/null
+++ b/inlong-common/src/main/java/org/apache/inlong/common/enums/DataProxyErrCode.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.common.enums;
+
+public enum DataProxyErrCode {
+
+ SUCCESS(0, "Ok"),
+
+ UNSUPPORTED_MSGTYPE(1, "Unsupported msgType"),
+ EMPTY_MSG(2, "Empty message"),
+ UNSUPPORTED_EXTENDFIELD_VALUE(3,
+ "Unsupported extend field value"),
+ UNCONFIGURED_GROUPID_OR_STREAMID(4,
+ "Un-configured groupId or streamId"),
+
+ UNKNOWN_ERROR(Integer.MAX_VALUE, "Unknown error");
+
+ private final int errCode;
+ private final String errMsg;
+
+ DataProxyErrCode(int errorCode, String errorMsg) {
+ this.errCode = errorCode;
+ this.errMsg = errorMsg;
+ }
+
+ public static DataProxyErrCode valueOf(int value) {
+ for (DataProxyErrCode msgErrCode : DataProxyErrCode.values()) {
+ if (msgErrCode.getErrCode() == value) {
+ return msgErrCode;
+ }
+ }
+
+ return UNKNOWN_ERROR;
+ }
+
+ public int getErrCode() {
+ return errCode;
+ }
+
+ public String getErrCodeStr() {
+ return String.valueOf(errCode);
+ }
+
+ public String getErrMsg() {
+ return errMsg;
+ }
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/AttributeConstants.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/AttributeConstants.java
index 8cb56c426..45d502750 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/AttributeConstants.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/AttributeConstants.java
@@ -90,4 +90,8 @@ public class AttributeConstants {
public static final String MESSAGE_IS_ACK = "isAck";
+ public static final String MESSAGE_PROCESS_ERRCODE = "errCode";
+
+ public static final String MESSAGE_PROCESS_ERRMSG = "errMsg";
+
}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java
index 1ec268016..1ede782ea 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java
@@ -100,6 +100,8 @@ public class ConfigConstants {
public static final String PROXY_CLUSTER_TAG = "proxy.cluster.tag";
public static final String PROXY_CLUSTER_INCHARGES = "proxy.cluster.inCharges";
public static final String CONFIG_CHECK_INTERVAL = "configCheckInterval";
+ public static final String SOURCE_NO_TOPIC_ACCEPT = "source.topic.notfound.accept";
+ public static final String SINK_NO_TOPIC_RESEND = "sink.topic.notfound.resend";
public static final String DECODER_BODY = "body";
public static final String DECODER_TOPICKEY = "topic_key";
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
index dc0a87b07..c7b0058c7 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
@@ -17,6 +17,8 @@
package org.apache.inlong.dataproxy.sink.pulsar;
+import static org.apache.inlong.common.util.NetworkUtils.getLocalIp;
+
import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Iterator;
@@ -36,7 +38,6 @@ import org.apache.inlong.dataproxy.config.pojo.MQClusterConfig;
import org.apache.inlong.dataproxy.consts.AttributeConstants;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.apache.inlong.dataproxy.sink.EventStat;
-import org.apache.inlong.dataproxy.source.MsgType;
import org.apache.inlong.dataproxy.utils.MessageUtils;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.ClientBuilder;
@@ -243,7 +244,9 @@ public class PulsarClientService {
logger.debug("order message rsp: seqId = {}, inlongGroupId = {}, inlongStreamId = {}", sequenceId,
inlongGroupId, inlongStreamId);
}
- ByteBuf binBuffer = MessageUtils.getResponsePackage("", MsgType.MSG_BIN_MULTI_BODY, sequenceId);
+ String attrs = ConfigConstants.DATAPROXY_IP_KEY
+ + AttributeConstants.KEY_VALUE_SEPARATOR + getLocalIp();
+ ByteBuf binBuffer = MessageUtils.buildBinMsgRspPackage(attrs, sequenceId);
orderEvent.getCtx().writeAndFlush(binBuffer);
});
}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
index a381f8c1c..3f96e1bf8 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
@@ -17,6 +17,7 @@
package org.apache.inlong.dataproxy.source;
+import static org.apache.inlong.common.util.NetworkUtils.getLocalIp;
import static org.apache.inlong.dataproxy.consts.AttributeConstants.SEPARATOR;
import static org.apache.inlong.dataproxy.source.SimpleTcpSource.blacklist;
@@ -30,9 +31,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.atomic.AtomicReference;
import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
@@ -45,7 +44,7 @@ import org.apache.flume.event.EventBuilder;
import org.apache.inlong.common.monitor.MonitorIndex;
import org.apache.inlong.common.monitor.MonitorIndexExt;
import org.apache.inlong.common.msg.InLongMsg;
-import org.apache.inlong.common.util.NetworkUtils;
+import org.apache.inlong.common.enums.DataProxyErrCode;
import org.apache.inlong.dataproxy.base.OrderEvent;
import org.apache.inlong.dataproxy.base.ProxyMessage;
import org.apache.inlong.dataproxy.config.ConfigManager;
@@ -247,97 +246,193 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
allChannels.remove(ctx.channel());
}
- private void checkGroupIdInfo(ProxyMessage message, Map<String, String> commonAttrMap,
- Map<String, String> attrMap, AtomicReference<String> topicInfo) {
- String groupId = message.getGroupId();
- String streamId = message.getStreamId();
- if (null != groupId) {
- // get configured group Id
- String from = commonAttrMap.get(AttributeConstants.FROM);
- if ("dc".equals(from)) {
- String dcInterfaceId = message.getStreamId();
- if (StringUtils.isNotEmpty(dcInterfaceId)
- && configManager.getDcMappingProperties()
- .containsKey(dcInterfaceId.trim())) {
- groupId = configManager.getDcMappingProperties()
- .get(dcInterfaceId.trim()).trim();
- message.setGroupId(groupId);
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ if (msg == null) {
+ logger.debug("Get null msg, just skip!");
+ return;
+ }
+ ByteBuf cb = (ByteBuf) msg;
+ try {
+ Channel remoteChannel = ctx.channel();
+ String strRemoteIP = getRemoteIp(remoteChannel);
+ int len = cb.readableBytes();
+ if (len == 0 && this.filterEmptyMsg) {
+ logger.debug("Get empty msg from {}, just skip!", strRemoteIP);
+ return;
+ }
+ // parse message
+ Map<String, Object> resultMap = null;
+ final long msgRcvTime = System.currentTimeMillis();
+ try {
+ resultMap = serviceDecoder.extractData(cb,
+ strRemoteIP, msgRcvTime, remoteChannel);
+ if (resultMap == null || resultMap.isEmpty()) {
+ logger.debug("Parse message result is null, from {}", strRemoteIP);
+ return;
}
+ } catch (MessageIDException ex) {
+ logger.error("MessageIDException ex = {}", ex);
+ throw new IOException(ex.getCause());
}
- // get configured topic name
- String configTopic = MessageUtils.getTopic(
- configManager.getTopicProperties(), groupId, streamId);
- if (StringUtils.isNotEmpty(configTopic)) {
- topicInfo.set(configTopic.trim());
+ // get msgType from parsed result
+ MsgType msgType = (MsgType) resultMap.get(ConfigConstants.MSG_TYPE);
+ // get attribute data from parsed result
+ Map<String, String> commonAttrMap =
+ (Map<String, String>) resultMap.get(ConfigConstants.COMMON_ATTR_MAP);
+ if (commonAttrMap == null) {
+ commonAttrMap = new HashMap<>();
}
- // get configured m value
- Map<String, String> mxValue =
- configManager.getMxPropertiesMaps().get(groupId);
- if (mxValue != null && mxValue.size() != 0) {
- message.getAttributeMap().putAll(mxValue);
- } else {
- message.getAttributeMap().putAll(mapSplitter.split(this.defaultMXAttr));
+ // process heartbeat message
+ if (MsgType.MSG_HEARTBEAT.equals(msgType)
+ || MsgType.MSG_BIN_HEARTBEAT.equals(msgType)) {
+ MessageUtils.sourceReturnRspPackage(
+ commonAttrMap, resultMap, remoteChannel, msgType);
+ return;
}
- } else {
- String num2name = commonAttrMap.get(AttributeConstants.NUM2NAME);
- String groupIdNum = commonAttrMap.get(AttributeConstants.GROUPID_NUM);
- String streamIdNum = commonAttrMap.get(AttributeConstants.STREAMID_NUM);
- // get configured groupId and steamId by numbers
- if (configManager.getGroupIdMappingProperties() != null
- && configManager.getStreamIdMappingProperties() != null) {
- groupId = configManager.getGroupIdMappingProperties().get(groupIdNum);
- streamId = (configManager.getStreamIdMappingProperties().get(groupIdNum) == null)
- ? null : configManager.getStreamIdMappingProperties().get(groupIdNum).get(streamIdNum);
- if (groupId != null && streamId != null) {
- String enableTrans =
- (configManager.getGroupIdEnableMappingProperties() == null)
- ? null : configManager.getGroupIdEnableMappingProperties().get(groupIdNum);
- if (("TRUE".equalsIgnoreCase(enableTrans)
- && "TRUE".equalsIgnoreCase(num2name))) {
- String extraAttr = "groupId=" + groupId + "&" + "streamId=" + streamId;
- message.setData(newBinMsg(message.getData(), extraAttr));
- }
- // reset groupId and streamId to message and attrMap
- attrMap.put(AttributeConstants.GROUP_ID, groupId);
- attrMap.put(AttributeConstants.STREAM_ID, streamId);
- message.setGroupId(groupId);
- message.setStreamId(streamId);
- // get configured topic name
- String configTopic = MessageUtils.getTopic(
- configManager.getTopicProperties(), groupId, streamId);
- if (StringUtils.isNotEmpty(configTopic)) {
- topicInfo.set(configTopic.trim());
- }
- }
+ // reject unsupported messages
+ if (commonAttrMap.containsKey(ConfigConstants.FILE_CHECK_DATA)
+ || commonAttrMap.containsKey(ConfigConstants.MINUTE_CHECK_DATA)) {
+ commonAttrMap.put(AttributeConstants.MESSAGE_PROCESS_ERRCODE,
+ DataProxyErrCode.UNSUPPORTED_EXTENDFIELD_VALUE.getErrCodeStr());
+ MessageUtils.sourceReturnRspPackage(
+ commonAttrMap, resultMap, remoteChannel, msgType);
+ return;
+ }
+ // check message's groupId, streamId, topic
+ List<ProxyMessage> msgList =
+ (List<ProxyMessage>) resultMap.get(ConfigConstants.MSG_LIST);
+ if (msgList == null) {
+ commonAttrMap.put(AttributeConstants.MESSAGE_PROCESS_ERRCODE,
+ DataProxyErrCode.EMPTY_MSG.getErrCodeStr());
+ MessageUtils.sourceReturnRspPackage(
+ commonAttrMap, resultMap, remoteChannel, msgType);
+ return;
+ }
+ // transfer message data
+ Map<String, HashMap<String, List<ProxyMessage>>> messageMap =
+ new HashMap<>(msgList.size());
+ if (!convertMsgList(msgList, commonAttrMap, messageMap, strRemoteIP)) {
+ MessageUtils.sourceReturnRspPackage(
+ commonAttrMap, resultMap, remoteChannel, msgType);
+ return;
}
+ // send messages to channel
+ formatMessagesAndSend(ctx, commonAttrMap,
+ messageMap, strRemoteIP, msgType, msgRcvTime);
+ // return response
+ if (!MessageUtils.isSyncSendForOrder(
+ commonAttrMap.get(AttributeConstants.MESSAGE_SYNC_SEND))) {
+ MessageUtils.sourceReturnRspPackage(
+ commonAttrMap, resultMap, remoteChannel, msgType);
+ }
+ } finally {
+ cb.release();
}
}
- private boolean updateMsgList(List<ProxyMessage> msgList, Map<String, String> commonAttrMap,
- Map<String, HashMap<String, List<ProxyMessage>>> messageMap,
- String strRemoteIP) {
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+ logger.error("exception caught cause = {}", cause);
+ monitorIndexExt.incrementAndGet("EVENT_OTHEREXP");
+ ctx.close();
+ }
+
+ /**
+ * Complete the message content and covert proxy message to map
+ *
+ * @param msgList the message list
+ * @param commonAttrMap common attribute map
+ * @param messageMap message list
+ * @param strRemoteIP remote ip
+ *
+ * @return convert result
+ */
+ private boolean convertMsgList(List<ProxyMessage> msgList, Map<String, String> commonAttrMap,
+ Map<String, HashMap<String, List<ProxyMessage>>> messageMap,
+ String strRemoteIP) {
for (ProxyMessage message : msgList) {
- String topic = this.defaultTopic;
- Map<String, String> attrMap = message.getAttributeMap();
- AtomicReference<String> topicInfo = new AtomicReference<>(topic);
- checkGroupIdInfo(message, commonAttrMap, attrMap, topicInfo);
+ String configTopic = null;
String groupId = message.getGroupId();
String streamId = message.getStreamId();
+ // get topic by groupId and streamId
+ if (null == groupId) {
+ String num2name = commonAttrMap.get(AttributeConstants.NUM2NAME);
+ String groupIdNum = commonAttrMap.get(AttributeConstants.GROUPID_NUM);
+ String streamIdNum = commonAttrMap.get(AttributeConstants.STREAMID_NUM);
+ // get configured groupId and steamId by numbers
+ if (configManager.getGroupIdMappingProperties() != null
+ && configManager.getStreamIdMappingProperties() != null) {
+ groupId = configManager.getGroupIdMappingProperties().get(groupIdNum);
+ streamId = (configManager.getStreamIdMappingProperties().get(groupIdNum) == null)
+ ? null : configManager.getStreamIdMappingProperties().get(groupIdNum).get(streamIdNum);
+ if (groupId != null && streamId != null) {
+ String enableTrans =
+ (configManager.getGroupIdEnableMappingProperties() == null)
+ ? null : configManager.getGroupIdEnableMappingProperties().get(groupIdNum);
+ if (("TRUE".equalsIgnoreCase(enableTrans)
+ && "TRUE".equalsIgnoreCase(num2name))) {
+ String extraAttr = "groupId=" + groupId + "&" + "streamId=" + streamId;
+ message.setData(newBinMsg(message.getData(), extraAttr));
+ }
+ // reset groupId and streamId to message and attrMap
+ message.setGroupId(groupId);
+ message.setStreamId(streamId);
+ // get configured topic name
+ configTopic = MessageUtils.getTopic(
+ configManager.getTopicProperties(), groupId, streamId);
+ }
+ }
+ } else {
+ // get configured group Id
+ String from = commonAttrMap.get(AttributeConstants.FROM);
+ if ("dc".equals(from)) {
+ String dcInterfaceId = message.getStreamId();
+ if (StringUtils.isNotEmpty(dcInterfaceId)
+ && configManager.getDcMappingProperties()
+ .containsKey(dcInterfaceId.trim())) {
+ groupId = configManager.getDcMappingProperties()
+ .get(dcInterfaceId.trim()).trim();
+ message.setGroupId(groupId);
+ }
+ }
+ // get configured m value
+ Map<String, String> mxValue =
+ configManager.getMxPropertiesMaps().get(groupId);
+ if (mxValue != null && mxValue.size() != 0) {
+ message.getAttributeMap().putAll(mxValue);
+ } else {
+ message.getAttributeMap().putAll(mapSplitter.split(this.defaultMXAttr));
+ }
+ // get configured topic name
+ configTopic = MessageUtils.getTopic(
+ configManager.getTopicProperties(), groupId, streamId);
+ }
+ // check topic configure
+ if (StringUtils.isEmpty(configTopic)) {
+ String acceptMsg =
+ configManager.getCommonProperties().getOrDefault(
+ ConfigConstants.SOURCE_NO_TOPIC_ACCEPT, "false");
+ if ("true".equalsIgnoreCase(acceptMsg)) {
+ configTopic = this.defaultTopic;
+ } else {
+ commonAttrMap.put(AttributeConstants.MESSAGE_PROCESS_ERRCODE,
+ DataProxyErrCode.UNCONFIGURED_GROUPID_OR_STREAMID.getErrCodeStr());
+ logger.debug("Topic for message is null , inlongGroupId = {}, inlongStreamId = {}",
+ groupId, streamId);
+ return false;
+ }
+ }
if (streamId == null) {
streamId = "";
message.setStreamId(streamId);
}
- topic = topicInfo.get();
- if (StringUtils.isEmpty(topic)) {
- logger.warn("Topic for message is null , inlongGroupId = {}, inlongStreamId = {}",
- groupId, streamId);
- }
// append topic
- message.setTopic(topic);
+ message.setTopic(configTopic);
commonAttrMap.put(AttributeConstants.NODE_IP, strRemoteIP);
// add ProxyMessage
HashMap<String, List<ProxyMessage>> streamIdMsgMap = messageMap
- .computeIfAbsent(topic, k -> new HashMap<>());
+ .computeIfAbsent(configTopic, k -> new HashMap<>());
List<ProxyMessage> streamIdMsgList = streamIdMsgMap
.computeIfAbsent(streamId, k -> new ArrayList<>());
streamIdMsgList.add(message);
@@ -345,6 +440,16 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
return true;
}
+ /**
+ * format message to event and send to channel
+ *
+ * @param ctx client connect
+ * @param commonAttrMap common attribute map
+ * @param messageMap message list
+ * @param strRemoteIP remote ip
+ * @param msgType the message type
+ * @param msgRcvTime the received time
+ */
private void formatMessagesAndSend(ChannelHandlerContext ctx, Map<String, String> commonAttrMap,
Map<String, HashMap<String, List<ProxyMessage>>> messageMap,
String strRemoteIP, MsgType msgType, long msgRcvTime) throws MessageIDException {
@@ -439,7 +544,7 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
.append(topicEntry.getKey()).append(SEPARATOR)
.append(streamIdEntry.getKey()).append(SEPARATOR)
.append(strRemoteIP).append(SEPARATOR)
- .append(NetworkUtils.getLocalIp()).append(SEPARATOR)
+ .append(getLocalIp()).append(SEPARATOR)
.append(orderType).append(SEPARATOR)
.append(DateTimeUtils.ms2yyyyMMddHHmm(longDataTime)).append(SEPARATOR)
.append(DateTimeUtils.ms2yyyyMMddHHmm(msgRcvTime));
@@ -451,7 +556,7 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
recordMsgCnt, 1, data.length, 0);
strBuff.delete(0, strBuff.length());
} catch (Throwable ex) {
- logger.error("Error writting to channel,data will discard.", ex);
+ logger.error("Error writting to channel, data will discard.", ex);
monitorIndexExt.incrementAndGet("EVENT_DROPPED");
monitorIndex.addAndGet(strBuff.toString(), 0, 0, 0, recordMsgCnt);
this.addStatistics(false, data.length, event);
@@ -462,199 +567,6 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
}
}
- private void responsePackage(Map<String, String> commonAttrMap,
- Map<String, Object> resultMap,
- Channel remoteChannel,
- MsgType msgType) throws Exception {
- String isAck = commonAttrMap.get(AttributeConstants.MESSAGE_IS_ACK);
- if (isAck == null || "true".equals(isAck)) {
- if (MsgType.MSG_ACK_SERVICE.equals(msgType) || MsgType.MSG_ORIGINAL_RETURN
- .equals(msgType)
- || MsgType.MSG_MULTI_BODY.equals(msgType) || MsgType.MSG_MULTI_BODY_ATTR
- .equals(msgType)) {
- byte[] backAttr = mapJoiner.join(commonAttrMap).getBytes(StandardCharsets.UTF_8);
- byte[] backBody = null;
-
- if (backAttr != null && !new String(backAttr, StandardCharsets.UTF_8).isEmpty()) {
- if (MsgType.MSG_ORIGINAL_RETURN.equals(msgType)) {
-
- backBody = (byte[]) resultMap.get(ConfigConstants.DECODER_BODY);
- } else {
-
- backBody = new byte[]{50};
- }
- int backTotalLen = 1 + 4 + backBody.length + 4 + backAttr.length;
- ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(4 + backTotalLen);
- buffer.writeInt(backTotalLen);
- buffer.writeByte(msgType.getValue());
- buffer.writeInt(backBody.length);
- buffer.writeBytes(backBody);
- buffer.writeInt(backAttr.length);
- buffer.writeBytes(backAttr);
- if (remoteChannel.isWritable()) {
- remoteChannel.writeAndFlush(buffer);
- } else {
- String backAttrStr = new String(backAttr, StandardCharsets.UTF_8);
- logger.warn(
- "the send buffer1 is full, so disconnect it!please check remote client"
- + "; Connection info:"
- + remoteChannel + ";attr is " + backAttrStr);
- buffer.release();
- throw new Exception(new Throwable(
- "the send buffer1 is full, so disconnect it!please check remote client"
- +
- "; Connection info:" + remoteChannel + ";attr is "
- + backAttrStr));
- }
- }
- } else if (MsgType.MSG_BIN_MULTI_BODY.equals(msgType)) {
- String backAttrs = (String) resultMap.get(ConfigConstants.DECODER_ATTRS);
- String uniqVal = commonAttrMap.get(AttributeConstants.UNIQ_ID);
- ByteBuf binBuffer = MessageUtils.getResponsePackage(backAttrs, msgType, uniqVal);
- if (remoteChannel.isWritable()) {
- remoteChannel.writeAndFlush(binBuffer);
- logger.debug("Connection info: {} ; attr is {} ; uniqVal {}",
- remoteChannel, backAttrs, uniqVal);
- } else {
- binBuffer.release();
- logger.warn(
- "the send buffer2 is full, so disconnect it!please check remote client"
- + "; Connection info:" + remoteChannel + ";attr is "
- + backAttrs);
- throw new Exception(new Throwable(
- "the send buffer2 is full,so disconnect it!please check remote client, Connection info:"
- + remoteChannel + ";attr is " + backAttrs));
- }
- }
- }
- }
-
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- if (msg == null) {
- logger.error("Get null msg, just skip!");
- return;
- }
- ByteBuf cb = (ByteBuf) msg;
- try {
- Channel remoteChannel = ctx.channel();
- String strRemoteIP = getRemoteIp(remoteChannel);
- int len = cb.readableBytes();
- if (len == 0 && this.filterEmptyMsg) {
- logger.warn("Get empty msg from {}, just skip!", strRemoteIP);
- return;
- }
- // parse message
- Map<String, Object> resultMap = null;
- final long msgRcvTime = System.currentTimeMillis();
- try {
- resultMap = serviceDecoder.extractData(cb,
- strRemoteIP, msgRcvTime, remoteChannel);
- if (resultMap == null || resultMap.isEmpty()) {
- logger.info("Parse message result is null, from {}", strRemoteIP);
- return;
- }
- } catch (MessageIDException ex) {
- logger.error("MessageIDException ex = {}", ex);
- throw new IOException(ex.getCause());
- }
- // process message by msgType
- MsgType msgType = (MsgType) resultMap.get(ConfigConstants.MSG_TYPE);
- if (MsgType.MSG_HEARTBEAT.equals(msgType)) {
- ByteBuf heartbeatBuffer = ByteBufAllocator.DEFAULT.buffer(5);
- heartbeatBuffer.writeBytes(new byte[]{0, 0, 0, 1, 1});
- remoteChannel.writeAndFlush(heartbeatBuffer);
- return;
- }
- // process heart beat 8
- if (MsgType.MSG_BIN_HEARTBEAT.equals(msgType)) {
- return;
- }
- // process data message
- Map<String, String> commonAttrMap =
- (Map<String, String>) resultMap.get(ConfigConstants.COMMON_ATTR_MAP);
- if (commonAttrMap == null) {
- commonAttrMap = new HashMap<String, String>();
- }
- List<ProxyMessage> msgList = (List<ProxyMessage>) resultMap.get(ConfigConstants.MSG_LIST);
- boolean checkMessageTopic = true;
- if (msgList != null) {
- if (commonAttrMap.containsKey(ConfigConstants.FILE_CHECK_DATA)) {
- // process file check data
- Map<String, String> headers = new HashMap<String, String>();
- headers.put("msgtype", "filestatus");
- headers.put(ConfigConstants.FILE_CHECK_DATA, "true");
- headers.put(AttributeConstants.UNIQ_ID,
- commonAttrMap.get(AttributeConstants.UNIQ_ID));
- for (ProxyMessage message : msgList) {
- byte[] body = message.getData();
- Event event = EventBuilder.withBody(body, headers);
- if (MessageUtils.isSyncSendForOrder(commonAttrMap
- .get(AttributeConstants.MESSAGE_SYNC_SEND))) {
- event = new OrderEvent(ctx, event);
- }
- try {
- processor.processEvent(event);
- this.addStatistics(true, body.length, event);
- } catch (Throwable ex) {
- logger.error("Error writing to controller,data will discard.", ex);
- this.addStatistics(false, body.length, event);
- throw new ChannelException(
- "Process Controller Event error can't write event to channel.");
- }
- }
- } else if (commonAttrMap.containsKey(ConfigConstants.MINUTE_CHECK_DATA)) {
- // process minute check data
- Map<String, String> headers = new HashMap<String, String>();
- headers.put("msgtype", "measure");
- headers.put(ConfigConstants.FILE_CHECK_DATA, "true");
- headers.put(AttributeConstants.UNIQ_ID,
- commonAttrMap.get(AttributeConstants.UNIQ_ID));
- for (ProxyMessage message : msgList) {
- byte[] body = message.getData();
- Event event = EventBuilder.withBody(body, headers);
- if (MessageUtils.isSyncSendForOrder(commonAttrMap
- .get(AttributeConstants.MESSAGE_SYNC_SEND))) {
- event = new OrderEvent(ctx, event);
- }
- try {
- processor.processEvent(event);
- this.addStatistics(true, body.length, event);
- } catch (Throwable ex) {
- logger.error("Error writing to controller,data will discard.", ex);
- this.addStatistics(false, body.length, event);
- throw new ChannelException(
- "Process Controller Event error can't write event to channel.");
- }
- }
- } else {
- // process message data
- Map<String, HashMap<String, List<ProxyMessage>>> messageMap =
- new HashMap<>(msgList.size());
- checkMessageTopic = updateMsgList(msgList,
- commonAttrMap, messageMap, strRemoteIP);
- if (checkMessageTopic) {
- formatMessagesAndSend(ctx, commonAttrMap,
- messageMap, strRemoteIP, msgType, msgRcvTime);
- }
- }
- }
- if (!checkMessageTopic || !MessageUtils.isSyncSendForOrder(commonAttrMap
- .get(AttributeConstants.MESSAGE_SYNC_SEND))) {
- responsePackage(commonAttrMap, resultMap, remoteChannel, msgType);
- }
- } finally {
- cb.release();
- }
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- logger.error("exception caught cause = {}", cause);
- monitorIndexExt.incrementAndGet("EVENT_OTHEREXP");
- ctx.close();
- }
-
/**
* add statistics information
*
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/MessageUtils.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/MessageUtils.java
index d8835658d..1d268415a 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/MessageUtils.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/MessageUtils.java
@@ -15,13 +15,17 @@
package org.apache.inlong.dataproxy.utils;
+import static org.apache.inlong.common.util.NetworkUtils.getLocalIp;
+
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.Channel;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.flume.Event;
+import org.apache.inlong.common.monitor.LogCounter;
import org.apache.inlong.common.util.NetworkUtils;
import org.apache.inlong.dataproxy.consts.AttributeConstants;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
@@ -30,6 +34,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MessageUtils {
+ // log print count
+ private static final LogCounter logCounter =
+ new LogCounter(10, 100000, 30 * 1000);
private static final Logger logger = LoggerFactory.getLogger(MessageUtils.class);
@@ -39,7 +46,7 @@ public class MessageUtils {
* @return true/false
*/
public static boolean isSyncSendForOrder(String syncSend) {
- if (StringUtils.isNotEmpty(syncSend) && "true".equals(syncSend)) {
+ if (StringUtils.isNotEmpty(syncSend) && "true".equalsIgnoreCase(syncSend)) {
return true;
}
return false;
@@ -56,22 +63,150 @@ public class MessageUtils {
}
/**
- * Convert String to ByteBuf
+ * Return response to client in source
+ * @param commonAttrMap attribute map
+ * @param resultMap result map
+ * @param remoteChannel client channel
+ * @param msgType the message type
+ */
+ public static void sourceReturnRspPackage(Map<String, String> commonAttrMap,
+ Map<String, Object> resultMap,
+ Channel remoteChannel,
+ MsgType msgType) throws Exception {
+ ByteBuf binBuffer;
+ String origAttrs = null;
+ final StringBuilder strBuff = new StringBuilder(512);
+ // build message bytes
+ if (MsgType.MSG_UNKNOWN.equals(msgType)) {
+ if (logCounter.shouldPrint()) {
+ logger.warn("Unknown msgType message from {}, discard it!", remoteChannel);
+ }
+ return;
+ }
+ if (MsgType.MSG_HEARTBEAT.equals(msgType)) {
+ binBuffer = buildHeartBeatMsgRspPackage();
+ } else {
+ // check whether return response message
+ String isAck = commonAttrMap.get(AttributeConstants.MESSAGE_IS_ACK);
+ if ("false".equalsIgnoreCase(isAck)) {
+ return;
+ }
+ origAttrs = (String) resultMap.get(ConfigConstants.DECODER_ATTRS);
+ // check whether channel is writable
+ if (!remoteChannel.isWritable()) {
+ strBuff.append("Send buffer is full1, so disconnect ")
+ .append(remoteChannel).append(", attr is ").append(origAttrs);
+ if (logCounter.shouldPrint()) {
+ logger.warn(strBuff.toString());
+ }
+ throw new Exception(strBuff.toString());
+ }
+ // build return attribute string
+ String errCode = commonAttrMap.get(AttributeConstants.MESSAGE_PROCESS_ERRCODE);
+ String errMsg = commonAttrMap.get(AttributeConstants.MESSAGE_PROCESS_ERRMSG);
+ strBuff.append(ConfigConstants.DATAPROXY_IP_KEY)
+ .append(AttributeConstants.KEY_VALUE_SEPARATOR).append(getLocalIp());
+ if (StringUtils.isNotEmpty(errCode)) {
+ strBuff.append(AttributeConstants.SEPARATOR)
+ .append(AttributeConstants.MESSAGE_PROCESS_ERRCODE)
+ .append(AttributeConstants.KEY_VALUE_SEPARATOR).append(errCode);
+ }
+ if (StringUtils.isNotEmpty(errMsg)) {
+ strBuff.append(AttributeConstants.SEPARATOR)
+ .append(AttributeConstants.MESSAGE_PROCESS_ERRMSG)
+ .append(AttributeConstants.KEY_VALUE_SEPARATOR).append(errMsg);
+ }
+ if (StringUtils.isNotEmpty(origAttrs)) {
+ strBuff.append(AttributeConstants.SEPARATOR).append(origAttrs);
+ }
+ String destAttrs = strBuff.toString();
+ // build response message bytes
+ if (MsgType.MSG_BIN_MULTI_BODY.equals(msgType)) {
+ binBuffer = buildBinMsgRspPackage(destAttrs,
+ commonAttrMap.get(AttributeConstants.UNIQ_ID));
+ } else if (MsgType.MSG_BIN_HEARTBEAT.equals(msgType)) {
+ binBuffer = buildHBRspPackage(destAttrs,
+ (Byte)resultMap.get(ConfigConstants.VERSION_TYPE), 0);
+ } else {
+ // MsgType.MSG_ACK_SERVICE.equals(msgType)
+ // MsgType.MSG_ORIGINAL_RETURN.equals(msgType)
+ // MsgType.MSG_MULTI_BODY.equals(msgType)
+ // MsgType.MSG_MULTI_BODY_ATTR.equals(msgType)
+ binBuffer = buildDefMsgRspPackage(msgType, destAttrs);
+ }
+ }
+ // send response message
+ if (remoteChannel.isWritable()) {
+ remoteChannel.writeAndFlush(binBuffer);
+ } else {
+ // release allocated ByteBuf
+ binBuffer.release();
+ strBuff.delete(0, strBuff.length());
+ strBuff.append("Send buffer is full2, so disconnect ")
+ .append(remoteChannel).append(", attr is ").append(origAttrs);
+ if (logCounter.shouldPrint()) {
+ logger.warn(strBuff.toString());
+ }
+ throw new Exception(strBuff.toString());
+ }
+ }
+
+ /**
+ * Build hearbeat(1)-msg response message ByteBuf
+ *
+ * @return ByteBuf
+ */
+ private static ByteBuf buildHeartBeatMsgRspPackage() {
+ ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(5);
+ // magic data
+ buffer.writeBytes(new byte[]{0, 0, 0, 1, 1});
+ return buffer;
+ }
+
+ /**
+ * Build default-msg response message ByteBuf
*
- * @param backattrs
- * @param msgType message type
+ * @param msgType the message type
+ * @param attrs the return attribute
+ * @return ByteBuf
+ */
+ private static ByteBuf buildDefMsgRspPackage(MsgType msgType, String attrs) {
+ int attrsLen = 0;
+ int bodyLen = 0;
+ if (attrs != null) {
+ attrsLen = attrs.length();
+ }
+ // backTotalLen = mstType + bodyLen + body + attrsLen + attrs
+ int backTotalLen = 1 + 4 + bodyLen + 4 + attrsLen;
+ ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(4 + backTotalLen);
+ buffer.writeInt(backTotalLen);
+ buffer.writeByte(msgType.getValue());
+ buffer.writeInt(bodyLen);
+ buffer.writeInt(attrsLen);
+ if (attrsLen > 0) {
+ buffer.writeBytes(attrs.getBytes(StandardCharsets.UTF_8));
+ }
+ return buffer;
+ }
+
+ /**
+ * Build bin-msg response message ByteBuf
+ *
+ * @param attrs the return attribute
* @param sequenceId sequence Id
* @return ByteBuf
*/
- public static ByteBuf getResponsePackage(String backattrs, MsgType msgType, String sequenceId) {
+ public static ByteBuf buildBinMsgRspPackage(String attrs, String sequenceId) {
+ // calculate total length
+ // binTotalLen = mstType + uniq + attrsLen + attrs + magic
int binTotalLen = 1 + 4 + 2 + 2;
- if (null != backattrs) {
- binTotalLen += backattrs.length();
+ if (null != attrs) {
+ binTotalLen += attrs.length();
}
+ // allocate buffer and write fields
ByteBuf binBuffer = ByteBufAllocator.DEFAULT.buffer(4 + binTotalLen);
binBuffer.writeInt(binTotalLen);
- binBuffer.writeByte(msgType.getValue());
-
+ binBuffer.writeByte(MsgType.MSG_BIN_MULTI_BODY.getValue());
long uniqVal = Long.parseLong(sequenceId);
byte[] uniq = new byte[4];
uniq[0] = (byte) ((uniqVal >> 24) & 0xFF);
@@ -79,10 +214,46 @@ public class MessageUtils {
uniq[2] = (byte) ((uniqVal >> 8) & 0xFF);
uniq[3] = (byte) (uniqVal & 0xFF);
binBuffer.writeBytes(uniq);
+ if (null != attrs) {
+ binBuffer.writeShort(attrs.length());
+ binBuffer.writeBytes(attrs.getBytes(StandardCharsets.UTF_8));
+ } else {
+ binBuffer.writeShort(0x0);
+ }
+ binBuffer.writeShort(0xee01);
+ return binBuffer;
+ }
- if (null != backattrs) {
- binBuffer.writeShort(backattrs.length());
- binBuffer.writeBytes(backattrs.getBytes(StandardCharsets.UTF_8));
+ /**
+ * Build heartbeat response message ByteBuf
+ *
+ * @param attrs the attribute string
+ * @param version the version
+ * @param loadValue the node load value
+ * @return ByteBuf
+ */
+ private static ByteBuf buildHBRspPackage(String attrs, byte version, int loadValue) {
+ // calculate total length
+ // binTotalLen = mstType + dataTime + version + bodyLen + body + attrsLen + attrs + magic
+ int binTotalLen = 1 + 4 + 1 + 4 + 2 + 2 + 2;
+ if (null != attrs) {
+ binTotalLen += attrs.length();
+ }
+ // check load value
+ if (loadValue == 0 || loadValue == (-1)) {
+ loadValue = 0xffff;
+ }
+ // allocate buffer and write fields
+ ByteBuf binBuffer = ByteBufAllocator.DEFAULT.buffer(4 + binTotalLen);
+ binBuffer.writeInt(binTotalLen);
+ binBuffer.writeByte(MsgType.MSG_BIN_HEARTBEAT.getValue());
+ binBuffer.writeInt((int) (System.currentTimeMillis() / 1000));
+ binBuffer.writeByte(version);
+ binBuffer.writeInt(2);
+ binBuffer.writeShort(loadValue);
+ if (null != attrs) {
+ binBuffer.writeShort(attrs.length());
+ binBuffer.writeBytes(attrs.getBytes(StandardCharsets.UTF_8));
} else {
binBuffer.writeShort(0x0);
}
@@ -104,8 +275,8 @@ public class MessageUtils {
}
}
if (logger.isDebugEnabled()) {
- logger.debug("Get topic by groupId = {}, streamId = {}, topic = {}", groupId, streamId,
- topic);
+ logger.debug("Get topic by groupId = {}, streamId = {}, topic = {}",
+ groupId, streamId, topic);
}
return topic;
}