You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/09/16 02:07:37 UTC

[inlong] branch master updated: [INLONG-5899][DataProxy] Optimize Http's SimpleMessageHandler class (#5900)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new c5b9319a8 [INLONG-5899][DataProxy] Optimize Http's SimpleMessageHandler class (#5900)
c5b9319a8 is described below

commit c5b9319a8fc49ebddad92a4f8193073db7e3f5fc
Author: Goson Zhang <46...@qq.com>
AuthorDate: Fri Sep 16 10:07:32 2022 +0800

    [INLONG-5899][DataProxy] Optimize Http's SimpleMessageHandler class (#5900)
---
 .../dataproxy/http/SimpleMessageHandler.java       | 135 +++++++++++----------
 .../inlong/dataproxy/utils/DateTimeUtils.java      |  41 +++++++
 2 files changed, 109 insertions(+), 67 deletions(-)

diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java
index 1969f7cea..3f1bf425c 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java
@@ -20,11 +20,6 @@ package org.apache.inlong.dataproxy.http;
 import static org.apache.inlong.dataproxy.consts.AttributeConstants.SEP_HASHTAG;
 import javax.servlet.http.HttpServletRequest;
 import java.io.UnsupportedEncodingException;
-import java.text.SimpleDateFormat;
-import java.time.Instant;
-import java.time.LocalDateTime;
-import java.time.ZoneId;
-import java.time.format.DateTimeFormatter;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.commons.lang3.StringUtils;
@@ -45,7 +40,7 @@ import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
 import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
 import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
 import org.apache.inlong.dataproxy.source.ServiceDecoder;
-import org.apache.inlong.dataproxy.utils.Constants;
+import org.apache.inlong.dataproxy.utils.DateTimeUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,9 +48,6 @@ public class SimpleMessageHandler implements MessageHandler {
 
     private static final Logger LOG = LoggerFactory.getLogger(SimpleMessageHandler.class);
     private static final ConfigManager configManager = ConfigManager.getInstance();
-    private static final DateTimeFormatter DATE_FORMATTER
-            = DateTimeFormatter.ofPattern("yyyyMMddHHmm");
-    private static final ZoneId defZoneId = ZoneId.systemDefault();
 
     private static final String DEFAULT_REMOTE_IDC_VALUE = "0";
     private final MonitorIndex monitorIndex;
@@ -88,96 +80,105 @@ public class SimpleMessageHandler implements MessageHandler {
 
     @Override
     public void processMessage(Context context) throws MessageProcessException {
-        String topicValue = "test";
-        String attr = "m=0";
-        StringBuilder newAttrBuffer = new StringBuilder(attr);
-
+        StringBuilder strBuff = new StringBuilder(512);
+        // get groupId and streamId
         String groupId = (String) context.get(AttributeConstants.GROUP_ID);
         String streamId = (String) context.get(AttributeConstants.STREAM_ID);
-        String dt = (String) context.get(AttributeConstants.DATA_TIME);
-
-        String value = getTopic(groupId, streamId);
-        if (null != value && !"".equals(value)) {
-            topicValue = value.trim();
+        if (StringUtils.isBlank(groupId) || StringUtils.isBlank(streamId)) {
+            throw new MessageProcessException(strBuff.append("Field ")
+                    .append(AttributeConstants.GROUP_ID).append(" or ")
+                    .append(AttributeConstants.STREAM_ID)
+                    .append(" must exist and not blank!").toString());
         }
-
-        String mxValue = configManager.getMxProperties().get(groupId);
-        if (null != mxValue) {
-            newAttrBuffer = new StringBuilder(mxValue.trim());
+        groupId = groupId.trim();
+        streamId = streamId.trim();
+        // get topicName
+        String topicName = "test";
+        String configedTopicName = getTopic(groupId, streamId);
+        if (StringUtils.isNotBlank(configedTopicName)) {
+            topicName = configedTopicName.trim();
         }
-
-        newAttrBuffer.append("&groupId=").append(groupId).append("&streamId=").append(streamId)
-                .append("&dt=").append(dt);
+        // get message data time
+        final long msgRcvTime = System.currentTimeMillis();
+        String strDataTime = (String) context.get(AttributeConstants.DATA_TIME);
+        long longDataTime = NumberUtils.toLong(strDataTime, msgRcvTime);
+        strDataTime = String.valueOf(longDataTime);
+        // get char set
+        String charset = (String) context.get(AttributeConstants.CHARSET);
+        if (StringUtils.isBlank(charset)) {
+            charset = AttributeConstants.CHARSET;
+        }
+        String body = (String) context.get(AttributeConstants.BODY);
+        if (StringUtils.isEmpty(body)) {
+            throw new MessageProcessException(strBuff.append("Field ")
+                    .append(AttributeConstants.BODY)
+                    .append(" must exist and not empty!").toString());
+        }
+        // get m attribute
+        String mxValue = "m=0";
+        String configedMxAttr = configManager.getMxProperties().get(groupId);
+        if (StringUtils.isNotEmpty(configedMxAttr)) {
+            mxValue = configedMxAttr.trim();
+        }
+        // convert context to http request
         HttpServletRequest request =
                 (HttpServletRequest) context.get(AttributeConstants.HTTP_REQUEST);
+        // get report node ip
         String strRemoteIP = request.getRemoteAddr();
-        newAttrBuffer.append("&NodeIP=").append(strRemoteIP);
-        String msgCount = request.getParameter(AttributeConstants.MESSAGE_COUNT);
-        if (msgCount == null || "".equals(msgCount)) {
-            msgCount = "1";
-        }
-
+        // get message count
+        String strMsgCount = request.getParameter(AttributeConstants.MESSAGE_COUNT);
+        int intMsgCnt = NumberUtils.toInt(strMsgCount, 1);
+        strMsgCount = String.valueOf(intMsgCnt);
+        // build message attributes
         InLongMsg inLongMsg = InLongMsg.newInLongMsg(true);
-        String charset = (String) context.get(AttributeConstants.CHARSET);
-        if (charset == null || "".equals(charset)) {
-            charset = "UTF-8";
-        }
-        String body = (String) context.get(AttributeConstants.BODY);
+        strBuff.append(mxValue).append("&groupId=").append(groupId)
+                .append("&streamId=").append(streamId)
+                .append("&dt=").append(strDataTime)
+                .append("&NodeIP=").append(strRemoteIP)
+                .append("&cnt=").append(strMsgCount)
+                .append("&rt=").append(msgRcvTime);
         try {
-            inLongMsg.addMsg(newAttrBuffer.toString(), body.getBytes(charset));
+            inLongMsg.addMsg(strBuff.toString(), body.getBytes(charset));
+            strBuff.delete(0, strBuff.length());
         } catch (UnsupportedEncodingException e) {
             throw new MessageProcessException(e);
         }
-
+        // build flume event
         Map<String, String> headers = new HashMap<>();
-        headers.put(AttributeConstants.DATA_TIME, dt);
-        headers.put(ConfigConstants.TOPIC_KEY, topicValue);
+        headers.put(AttributeConstants.GROUP_ID, groupId);
         headers.put(AttributeConstants.STREAM_ID, streamId);
+        headers.put(ConfigConstants.TOPIC_KEY, topicName);
+        headers.put(AttributeConstants.DATA_TIME, strDataTime);
         headers.put(ConfigConstants.REMOTE_IP_KEY, strRemoteIP);
         headers.put(ConfigConstants.REMOTE_IDC_KEY, DEFAULT_REMOTE_IDC_VALUE);
-        headers.put(ConfigConstants.MSG_COUNTER_KEY, msgCount);
+        headers.put(ConfigConstants.MSG_COUNTER_KEY, strMsgCount);
         byte[] data = inLongMsg.buildArray();
         headers.put(ConfigConstants.TOTAL_LEN, String.valueOf(data.length));
-        // add msgTime if not existed
-        long currTIme = System.currentTimeMillis();
-        String strMsgTime = request.getParameter(Constants.HEADER_KEY_MSG_TIME);
-        long pkgTimeInMillis = NumberUtils.toLong(strMsgTime, currTIme);
-        LocalDateTime localDateTime =
-                LocalDateTime.ofInstant(Instant.ofEpochMilli(pkgTimeInMillis), defZoneId);
-        String pkgTime = DATE_FORMATTER.format(localDateTime);
-        headers.put(Constants.HEADER_KEY_MSG_TIME, String.valueOf(pkgTimeInMillis));
-        headers.put(ConfigConstants.PKG_TIME_KEY, pkgTime);
+        headers.put(AttributeConstants.RCV_TIME, String.valueOf(msgRcvTime));
         Event event = EventBuilder.withBody(data, headers);
         inLongMsg.reset();
-        long dtten;
-        try {
-            dtten = Long.parseLong(dt);
-        } catch (NumberFormatException e1) {
-            throw new MessageProcessException(new Throwable(
-                    "attribute dt=" + dt + " has error," + " detail is: " + newAttrBuffer));
-        }
-        dtten = dtten / 1000 / 60 / 10;
-        dtten = dtten * 1000 * 60 * 10;
-        StringBuilder newBase = new StringBuilder();
-        newBase.append("http").append(SEP_HASHTAG).append(topicValue).append(SEP_HASHTAG)
+        // build metric data item
+        longDataTime = longDataTime / 1000 / 60 / 10;
+        longDataTime = longDataTime * 1000 * 60 * 10;
+        strBuff.append("http").append(SEP_HASHTAG).append(topicName).append(SEP_HASHTAG)
                 .append(streamId).append(SEP_HASHTAG).append(strRemoteIP).append(SEP_HASHTAG)
                 .append(NetworkUtils.getLocalIp()).append(SEP_HASHTAG)
                 .append("non-order").append(SEP_HASHTAG)
-                .append(new SimpleDateFormat("yyyyMMddHHmm").format(dtten)).append(SEP_HASHTAG)
-                .append(pkgTime);
+                .append(DateTimeUtils.ms2yyyyMMddHHmm(longDataTime)).append(SEP_HASHTAG)
+                .append(DateTimeUtils.ms2yyyyMMddHHmm(msgRcvTime));
         long beginTime = System.currentTimeMillis();
         try {
             processor.processEvent(event);
             if (monitorIndex != null) {
-                monitorIndex.addAndGet(new String(newBase),
-                        Integer.parseInt(msgCount), 1, data.length, 0);
+                monitorIndex.addAndGet(strBuff.toString(),
+                        intMsgCnt, 1, data.length, 0);
                 monitorIndexExt.incrementAndGet("EVENT_SUCCESS");
             }
             addMetric(true, data.length, event);
         } catch (ChannelException ex) {
             if (monitorIndex != null) {
-                monitorIndex.addAndGet(new String(newBase),
-                        0, 0, 0, Integer.parseInt(msgCount));
+                monitorIndex.addAndGet(strBuff.toString(),
+                        0, 0, 0, intMsgCnt);
                 monitorIndexExt.incrementAndGet("EVENT_DROPPED");
             }
             addMetric(false, data.length, event);
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/DateTimeUtils.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/DateTimeUtils.java
new file mode 100644
index 000000000..222e343c8
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/DateTimeUtils.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.utils;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+
+public class DateTimeUtils {
+    private static final DateTimeFormatter DATE_FORMATTER
+            = DateTimeFormatter.ofPattern("yyyyMMddHHmm");
+    private static final ZoneId defZoneId = ZoneId.systemDefault();
+
+    /**
+     * convert ms value to 'yyyyMMddHHmm' string
+     *
+     * @param timestamp The millisecond value of the specified time
+     * @return the time string in yyyyMMddHHmm format
+     */
+    public static String ms2yyyyMMddHHmm(long timestamp) {
+        LocalDateTime localDateTime =
+                LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), defZoneId);
+        return DATE_FORMATTER.format(localDateTime);
+    }
+}