You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2022/09/26 11:00:16 UTC
[inlong] branch release-1.3.0 updated: [INLONG-5899][DataProxy] Optimize Http's SimpleMessageHandler class (#5900)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/release-1.3.0 by this push:
new b5c0046cd [INLONG-5899][DataProxy] Optimize Http's SimpleMessageHandler class (#5900)
b5c0046cd is described below
commit b5c0046cd1c847782b0bc25d1f48878052da9e41
Author: Goson Zhang <46...@qq.com>
AuthorDate: Fri Sep 16 10:07:32 2022 +0800
[INLONG-5899][DataProxy] Optimize Http's SimpleMessageHandler class (#5900)
---
.../dataproxy/http/SimpleMessageHandler.java | 135 +++++++++++----------
.../inlong/dataproxy/utils/DateTimeUtils.java | 41 +++++++
2 files changed, 109 insertions(+), 67 deletions(-)
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java
index 1969f7cea..3f1bf425c 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java
@@ -20,11 +20,6 @@ package org.apache.inlong.dataproxy.http;
import static org.apache.inlong.dataproxy.consts.AttributeConstants.SEP_HASHTAG;
import javax.servlet.http.HttpServletRequest;
import java.io.UnsupportedEncodingException;
-import java.text.SimpleDateFormat;
-import java.time.Instant;
-import java.time.LocalDateTime;
-import java.time.ZoneId;
-import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
@@ -45,7 +40,7 @@ import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
import org.apache.inlong.dataproxy.source.ServiceDecoder;
-import org.apache.inlong.dataproxy.utils.Constants;
+import org.apache.inlong.dataproxy.utils.DateTimeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,9 +48,6 @@ public class SimpleMessageHandler implements MessageHandler {
private static final Logger LOG = LoggerFactory.getLogger(SimpleMessageHandler.class);
private static final ConfigManager configManager = ConfigManager.getInstance();
- private static final DateTimeFormatter DATE_FORMATTER
- = DateTimeFormatter.ofPattern("yyyyMMddHHmm");
- private static final ZoneId defZoneId = ZoneId.systemDefault();
private static final String DEFAULT_REMOTE_IDC_VALUE = "0";
private final MonitorIndex monitorIndex;
@@ -88,96 +80,105 @@ public class SimpleMessageHandler implements MessageHandler {
@Override
public void processMessage(Context context) throws MessageProcessException {
- String topicValue = "test";
- String attr = "m=0";
- StringBuilder newAttrBuffer = new StringBuilder(attr);
-
+ StringBuilder strBuff = new StringBuilder(512);
+ // get groupId and streamId
String groupId = (String) context.get(AttributeConstants.GROUP_ID);
String streamId = (String) context.get(AttributeConstants.STREAM_ID);
- String dt = (String) context.get(AttributeConstants.DATA_TIME);
-
- String value = getTopic(groupId, streamId);
- if (null != value && !"".equals(value)) {
- topicValue = value.trim();
+ if (StringUtils.isBlank(groupId) || StringUtils.isBlank(streamId)) {
+ throw new MessageProcessException(strBuff.append("Field ")
+ .append(AttributeConstants.GROUP_ID).append(" or ")
+ .append(AttributeConstants.STREAM_ID)
+ .append(" must exist and not blank!").toString());
}
-
- String mxValue = configManager.getMxProperties().get(groupId);
- if (null != mxValue) {
- newAttrBuffer = new StringBuilder(mxValue.trim());
+ groupId = groupId.trim();
+ streamId = streamId.trim();
+ // get topicName
+ String topicName = "test";
+ String configedTopicName = getTopic(groupId, streamId);
+ if (StringUtils.isNotBlank(configedTopicName)) {
+ topicName = configedTopicName.trim();
}
-
- newAttrBuffer.append("&groupId=").append(groupId).append("&streamId=").append(streamId)
- .append("&dt=").append(dt);
+ // get message data time
+ final long msgRcvTime = System.currentTimeMillis();
+ String strDataTime = (String) context.get(AttributeConstants.DATA_TIME);
+ long longDataTime = NumberUtils.toLong(strDataTime, msgRcvTime);
+ strDataTime = String.valueOf(longDataTime);
+ // get char set
+ String charset = (String) context.get(AttributeConstants.CHARSET);
+ if (StringUtils.isBlank(charset)) {
+ charset = AttributeConstants.CHARSET;
+ }
+ String body = (String) context.get(AttributeConstants.BODY);
+ if (StringUtils.isEmpty(body)) {
+ throw new MessageProcessException(strBuff.append("Field ")
+ .append(AttributeConstants.BODY)
+ .append(" must exist and not empty!").toString());
+ }
+ // get m attribute
+ String mxValue = "m=0";
+ String configedMxAttr = configManager.getMxProperties().get(groupId);
+ if (StringUtils.isNotEmpty(configedMxAttr)) {
+ mxValue = configedMxAttr.trim();
+ }
+ // convert context to http request
HttpServletRequest request =
(HttpServletRequest) context.get(AttributeConstants.HTTP_REQUEST);
+ // get report node ip
String strRemoteIP = request.getRemoteAddr();
- newAttrBuffer.append("&NodeIP=").append(strRemoteIP);
- String msgCount = request.getParameter(AttributeConstants.MESSAGE_COUNT);
- if (msgCount == null || "".equals(msgCount)) {
- msgCount = "1";
- }
-
+ // get message count
+ String strMsgCount = request.getParameter(AttributeConstants.MESSAGE_COUNT);
+ int intMsgCnt = NumberUtils.toInt(strMsgCount, 1);
+ strMsgCount = String.valueOf(intMsgCnt);
+ // build message attributes
InLongMsg inLongMsg = InLongMsg.newInLongMsg(true);
- String charset = (String) context.get(AttributeConstants.CHARSET);
- if (charset == null || "".equals(charset)) {
- charset = "UTF-8";
- }
- String body = (String) context.get(AttributeConstants.BODY);
+ strBuff.append(mxValue).append("&groupId=").append(groupId)
+ .append("&streamId=").append(streamId)
+ .append("&dt=").append(strDataTime)
+ .append("&NodeIP=").append(strRemoteIP)
+ .append("&cnt=").append(strMsgCount)
+ .append("&rt=").append(msgRcvTime);
try {
- inLongMsg.addMsg(newAttrBuffer.toString(), body.getBytes(charset));
+ inLongMsg.addMsg(strBuff.toString(), body.getBytes(charset));
+ strBuff.delete(0, strBuff.length());
} catch (UnsupportedEncodingException e) {
throw new MessageProcessException(e);
}
-
+ // build flume event
Map<String, String> headers = new HashMap<>();
- headers.put(AttributeConstants.DATA_TIME, dt);
- headers.put(ConfigConstants.TOPIC_KEY, topicValue);
+ headers.put(AttributeConstants.GROUP_ID, groupId);
headers.put(AttributeConstants.STREAM_ID, streamId);
+ headers.put(ConfigConstants.TOPIC_KEY, topicName);
+ headers.put(AttributeConstants.DATA_TIME, strDataTime);
headers.put(ConfigConstants.REMOTE_IP_KEY, strRemoteIP);
headers.put(ConfigConstants.REMOTE_IDC_KEY, DEFAULT_REMOTE_IDC_VALUE);
- headers.put(ConfigConstants.MSG_COUNTER_KEY, msgCount);
+ headers.put(ConfigConstants.MSG_COUNTER_KEY, strMsgCount);
byte[] data = inLongMsg.buildArray();
headers.put(ConfigConstants.TOTAL_LEN, String.valueOf(data.length));
- // add msgTime if not existed
- long currTIme = System.currentTimeMillis();
- String strMsgTime = request.getParameter(Constants.HEADER_KEY_MSG_TIME);
- long pkgTimeInMillis = NumberUtils.toLong(strMsgTime, currTIme);
- LocalDateTime localDateTime =
- LocalDateTime.ofInstant(Instant.ofEpochMilli(pkgTimeInMillis), defZoneId);
- String pkgTime = DATE_FORMATTER.format(localDateTime);
- headers.put(Constants.HEADER_KEY_MSG_TIME, String.valueOf(pkgTimeInMillis));
- headers.put(ConfigConstants.PKG_TIME_KEY, pkgTime);
+ headers.put(AttributeConstants.RCV_TIME, String.valueOf(msgRcvTime));
Event event = EventBuilder.withBody(data, headers);
inLongMsg.reset();
- long dtten;
- try {
- dtten = Long.parseLong(dt);
- } catch (NumberFormatException e1) {
- throw new MessageProcessException(new Throwable(
- "attribute dt=" + dt + " has error," + " detail is: " + newAttrBuffer));
- }
- dtten = dtten / 1000 / 60 / 10;
- dtten = dtten * 1000 * 60 * 10;
- StringBuilder newBase = new StringBuilder();
- newBase.append("http").append(SEP_HASHTAG).append(topicValue).append(SEP_HASHTAG)
+ // build metric data item
+ longDataTime = longDataTime / 1000 / 60 / 10;
+ longDataTime = longDataTime * 1000 * 60 * 10;
+ strBuff.append("http").append(SEP_HASHTAG).append(topicName).append(SEP_HASHTAG)
.append(streamId).append(SEP_HASHTAG).append(strRemoteIP).append(SEP_HASHTAG)
.append(NetworkUtils.getLocalIp()).append(SEP_HASHTAG)
.append("non-order").append(SEP_HASHTAG)
- .append(new SimpleDateFormat("yyyyMMddHHmm").format(dtten)).append(SEP_HASHTAG)
- .append(pkgTime);
+ .append(DateTimeUtils.ms2yyyyMMddHHmm(longDataTime)).append(SEP_HASHTAG)
+ .append(DateTimeUtils.ms2yyyyMMddHHmm(msgRcvTime));
long beginTime = System.currentTimeMillis();
try {
processor.processEvent(event);
if (monitorIndex != null) {
- monitorIndex.addAndGet(new String(newBase),
- Integer.parseInt(msgCount), 1, data.length, 0);
+ monitorIndex.addAndGet(strBuff.toString(),
+ intMsgCnt, 1, data.length, 0);
monitorIndexExt.incrementAndGet("EVENT_SUCCESS");
}
addMetric(true, data.length, event);
} catch (ChannelException ex) {
if (monitorIndex != null) {
- monitorIndex.addAndGet(new String(newBase),
- 0, 0, 0, Integer.parseInt(msgCount));
+ monitorIndex.addAndGet(strBuff.toString(),
+ 0, 0, 0, intMsgCnt);
monitorIndexExt.incrementAndGet("EVENT_DROPPED");
}
addMetric(false, data.length, event);
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/DateTimeUtils.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/DateTimeUtils.java
new file mode 100644
index 000000000..222e343c8
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/DateTimeUtils.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.utils;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+
+public class DateTimeUtils {
+ private static final DateTimeFormatter DATE_FORMATTER
+ = DateTimeFormatter.ofPattern("yyyyMMddHHmm");
+ private static final ZoneId defZoneId = ZoneId.systemDefault();
+
+ /**
+ * convert ms value to 'yyyyMMddHHmm' string
+ *
+ * @param timestamp The millisecond value of the specified time
+ * @return the time string in yyyyMMddHHmm format
+ */
+ public static String ms2yyyyMMddHHmm(long timestamp) {
+ LocalDateTime localDateTime =
+ LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), defZoneId);
+ return DATE_FORMATTER.format(localDateTime);
+ }
+}