You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/09/10 04:30:19 UTC

[inlong] branch master updated: [INLONG-5808][SDK] Sort SDK supports parse InlongMsg (#5809)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 4e4e8a36e [INLONG-5808][SDK] Sort SDK supports parse InlongMsg (#5809)
4e4e8a36e is described below

commit 4e4e8a36ea24285a9e2e5e11e759e4dca0cf0d95
Author: vernedeng <de...@pku.edu.cn>
AuthorDate: Sat Sep 10 12:30:13 2022 +0800

    [INLONG-5808][SDK] Sort SDK supports parse InlongMsg (#5809)
---
 .../sdk/sort/impl/decode/MessageDeserializer.java  |  96 +++++++++++-
 .../apache/inlong/sdk/sort/util/StringUtil.java    | 169 +++++++++++++++++++++
 .../sort/impl/decode/MessageDeserializerTest.java  |  43 +++++-
 3 files changed, 297 insertions(+), 11 deletions(-)

diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/decode/MessageDeserializer.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/decode/MessageDeserializer.java
index 145b7ad77..b0c54663e 100644
--- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/decode/MessageDeserializer.java
+++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/decode/MessageDeserializer.java
@@ -23,8 +23,13 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+import org.apache.inlong.common.msg.InLongMsg;
 import org.apache.inlong.sdk.commons.protocol.ProxySdk.MapFieldEntry;
 import org.apache.inlong.sdk.commons.protocol.ProxySdk.MessageObj;
 import org.apache.inlong.sdk.commons.protocol.ProxySdk.MessageObjs;
@@ -32,12 +37,14 @@ import org.apache.inlong.sdk.sort.api.ClientContext;
 import org.apache.inlong.sdk.sort.api.Deserializer;
 import org.apache.inlong.sdk.sort.entity.InLongMessage;
 import org.apache.inlong.sdk.sort.entity.InLongTopic;
+import org.apache.inlong.sdk.sort.util.StringUtil;
 import org.apache.inlong.sdk.sort.util.Utils;
 
 public class MessageDeserializer implements Deserializer {
 
     private static final int MESSAGE_VERSION_NONE = 0;
     private static final int MESSAGE_VERSION_PB = 1;
+    private static final int MESSAGE_VERSION_INLONG_MSG = 2;
     private static final int COMPRESS_TYPE_NONE = 0;
     private static final int COMPRESS_TYPE_GZIP = 1;
     private static final int COMPRESS_TYPE_SNAPPY = 2;
@@ -48,15 +55,29 @@ public class MessageDeserializer implements Deserializer {
     private static final String INLONG_GROUPID_KEY = "inlongGroupId";
     private static final String INLONG_STREAMID_KEY = "inlongStreamId";
 
+    private static final String INLONGMSG_ATTR_STREAM_ID = "streamId";
+    private static final String INLONGMSG_ATTR_GROUP_ID = "groupId";
+    private static final String INLONGMSG_ATTR_TIME_T = "t";
+    private static final String INLONGMSG_ATTR_TIME_DT = "dt";
+    private static final String INLONGMSG_ATTR_NODE_IP = "NodeIP";
+    private static final char INLONGMSG_ATTR_ENTRY_DELIMITER = '&';
+    private static final char INLONGMSG_ATTR_KV_DELIMITER = '=';
+    private static final String DEFAULT_IP = "127.0.0.1";
+
+    private static final String PARSE_ATTR_ERROR_STRING = "Could not find %s in attributes!";
+
     public MessageDeserializer() {
     }
 
     @Override
-    public List<InLongMessage> deserialize(ClientContext context, InLongTopic inLongTopic, Map<String, String> headers,
+    public List<InLongMessage> deserialize(
+            ClientContext context,
+            InLongTopic inLongTopic,
+            Map<String, String> headers,
             byte[] data) throws Exception {
 
         //1. version
-        int version = Integer.parseInt(headers.getOrDefault(VERSION_KEY, "0"));
+        int version = Integer.parseInt(headers.getOrDefault(VERSION_KEY, Integer.toString(MESSAGE_VERSION_INLONG_MSG)));
         switch (version) {
             case MESSAGE_VERSION_NONE: {
                 return decode(context, inLongTopic, data, headers);
@@ -64,12 +85,18 @@ public class MessageDeserializer implements Deserializer {
             case MESSAGE_VERSION_PB: {
                 return decodePB(context, inLongTopic, data, headers);
             }
+            case MESSAGE_VERSION_INLONG_MSG: {
+                return decodeInlongMsg(context, inLongTopic, data, headers);
+            }
             default:
                 throw new IllegalArgumentException("Unknown version type:" + version);
         }
     }
 
-    private List<InLongMessage> decode(ClientContext context, InLongTopic inLongTopic, byte[] msgBytes,
+    private List<InLongMessage> decode(
+            ClientContext context,
+            InLongTopic inLongTopic,
+            byte[] msgBytes,
             Map<String, String> headers) {
         long msgTime = Long.parseLong(headers.getOrDefault(MSG_TIME_KEY, "0"));
         String sourceIp = headers.getOrDefault(SOURCE_IP_KEY, "");
@@ -89,7 +116,10 @@ public class MessageDeserializer implements Deserializer {
      * @param msgBytes byte[]
      * @return {@link MessageObjs}
      */
-    private List<InLongMessage> decodePB(ClientContext context, InLongTopic inLongTopic, byte[] msgBytes,
+    private List<InLongMessage> decodePB(
+            ClientContext context,
+            InLongTopic inLongTopic,
+            byte[] msgBytes,
             Map<String, String> headers) throws IOException {
         int compressType = Integer.parseInt(headers.getOrDefault(COMPRESS_TYPE_KEY, "0"));
         String inlongGroupId = headers.getOrDefault(INLONG_GROUPID_KEY, "");
@@ -120,7 +150,8 @@ public class MessageDeserializer implements Deserializer {
      * @param messageObjs {@link MessageObjs}
      * @return {@link List}
      */
-    private List<InLongMessage> transformMessageObjs(ClientContext context, InLongTopic inLongTopic,
+    private List<InLongMessage> transformMessageObjs(
+            ClientContext context, InLongTopic inLongTopic,
             MessageObjs messageObjs, String inlongGroupId,
             String inlongStreamId) {
         if (null == messageObjs) {
@@ -144,4 +175,59 @@ public class MessageDeserializer implements Deserializer {
         }
         return inLongMessages;
     }
+
+    private List<InLongMessage> decodeInlongMsg(
+            ClientContext context,
+            InLongTopic inLongTopic,
+            byte[] msgBytes,
+            Map<String, String> headers) {
+        List<InLongMessage> messageList = new ArrayList<>();
+
+        InLongMsg inLongMsg = InLongMsg.parseFrom(msgBytes);
+        for (String attr : inLongMsg.getAttrs()) {
+            Map<String, String> attributes = StringUtil.splitKv(attr, INLONGMSG_ATTR_ENTRY_DELIMITER,
+                    INLONGMSG_ATTR_KV_DELIMITER, null, null);
+
+            String groupId = Optional.ofNullable(attributes.get(INLONGMSG_ATTR_GROUP_ID))
+                    .orElseThrow(() -> new IllegalArgumentException(String.format(PARSE_ATTR_ERROR_STRING,
+                            INLONGMSG_ATTR_GROUP_ID)));
+
+            String streamId = Optional.ofNullable(attributes.get(INLONGMSG_ATTR_STREAM_ID))
+                    .orElseThrow(() -> new IllegalArgumentException(String.format(PARSE_ATTR_ERROR_STRING,
+                            INLONGMSG_ATTR_STREAM_ID)));
+
+            // Extracts time from the attributes
+            long msgTime;
+            if (attributes.containsKey(INLONGMSG_ATTR_TIME_T)) {
+                String date = attributes.get(INLONGMSG_ATTR_TIME_T).trim();
+                msgTime = StringUtil.parseDateTime(date);
+            } else if (attributes.containsKey(INLONGMSG_ATTR_TIME_DT)) {
+                String epoch = attributes.get(INLONGMSG_ATTR_TIME_DT).trim();
+                msgTime = Long.parseLong(epoch);
+            } else {
+                throw new IllegalArgumentException(String.format(PARSE_ATTR_ERROR_STRING,
+                        INLONGMSG_ATTR_TIME_T + " or " + INLONGMSG_ATTR_TIME_DT));
+            }
+
+            String srcIp = Optional.ofNullable(attributes.get(INLONGMSG_ATTR_NODE_IP))
+                    .orElse(DEFAULT_IP);
+
+            Iterator<byte[]> iterator = inLongMsg.getIterator(attr);
+            while (iterator.hasNext()) {
+                byte[] bodyBytes = iterator.next();
+                if (Objects.isNull(bodyBytes)) {
+                    continue;
+                }
+                InLongMessage inLongMessage = new InLongMessage(groupId, streamId, msgTime,
+                        srcIp, bodyBytes, attributes);
+                messageList.add(inLongMessage);
+                context.getStatManager()
+                        .getStatistics(context.getConfig().getSortTaskId(),
+                                inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
+                        .addDecompressionConsumeSize(inLongMessage.getBody().length);
+            }
+        }
+        return messageList;
+    }
+
 }
diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/util/StringUtil.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/util/StringUtil.java
index 6bacc8c22..2a8774255 100644
--- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/util/StringUtil.java
+++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/util/StringUtil.java
@@ -17,12 +17,151 @@
 
 package org.apache.inlong.sdk.sort.util;
 
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.sql.Timestamp;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
 
 public class StringUtil {
 
+    private static final int STATE_NORMAL = 0;
+    private static final int STATE_KEY = 2;
+    private static final int STATE_VALUE = 4;
+    private static final int STATE_ESCAPING = 8;
+    private static final int STATE_QUOTING = 16;
+
+    /**
+     * Splits the kv text.
+     *
+     * <p>Both escaping and quoting is supported. When the escape character is
+     * not '\0', then the next character to the escape character will be
+     * escaped. When the quote character is not '\0', then all characters
+     * between consecutive quote characters will be escaped.</p>
+     *
+     * @param text The text to be split.
+     * @param entryDelimiter The delimiter of entries.
+     * @param kvDelimiter The delimiter between key and value.
+     * @param escapeChar The escaping character. Only valid if not '\0'.
+     * @param quoteChar The quoting character.
+     * @return The fields split from the text.
+     */
+    @SuppressWarnings("checkstyle:MissingSwitchDefault")
+    public static Map<String, String> splitKv(
+            @Nonnull String text,
+            @Nonnull Character entryDelimiter,
+            @Nonnull Character kvDelimiter,
+            @Nullable Character escapeChar,
+            @Nullable Character quoteChar
+    ) {
+        Map<String, String> fields = new HashMap<>();
+
+        StringBuilder stringBuilder = new StringBuilder();
+
+        String key = "";
+        String value;
+
+        int state = STATE_KEY;
+
+        /*
+         * The state when entering escaping and quoting. When we exit escaping
+         * or quoting, we should restore this state.
+         */
+        int kvState = STATE_KEY;
+
+        for (int i = 0; i < text.length(); ++i) {
+            char ch = text.charAt(i);
+
+            if (ch == kvDelimiter) {
+                switch (state) {
+                    case STATE_KEY:
+                        key = stringBuilder.toString();
+                        stringBuilder.setLength(0);
+                        state = STATE_VALUE;
+                        break;
+                    case STATE_VALUE:
+                        throw new IllegalArgumentException("Unexpected token " + ch + " at position " + i + ".");
+                    case STATE_ESCAPING:
+                        stringBuilder.append(ch);
+                        state = kvState;
+                        break;
+                    case STATE_QUOTING:
+                        stringBuilder.append(ch);
+                        break;
+                }
+            } else if (ch == entryDelimiter) {
+                switch (state) {
+                    case STATE_KEY:
+                        throw new IllegalArgumentException("Unexpected token " + ch + " at position " + i + ".");
+                    case STATE_VALUE:
+                        value = stringBuilder.toString();
+                        fields.put(key, value);
+
+                        stringBuilder.setLength(0);
+                        state = STATE_KEY;
+                        break;
+                    case STATE_ESCAPING:
+                        stringBuilder.append(ch);
+                        state = kvState;
+                        break;
+                    case STATE_QUOTING:
+                        stringBuilder.append(ch);
+                        break;
+                }
+            } else if (escapeChar != null && ch == escapeChar) {
+                switch (state) {
+                    case STATE_KEY:
+                    case STATE_VALUE:
+                        kvState = state;
+                        state = STATE_ESCAPING;
+                        break;
+                    case STATE_ESCAPING:
+                        stringBuilder.append(ch);
+                        state = kvState;
+                        break;
+                    case STATE_QUOTING:
+                        stringBuilder.append(ch);
+                        break;
+                }
+            } else if (quoteChar != null && ch == quoteChar) {
+                switch (state) {
+                    case STATE_KEY:
+                    case STATE_VALUE:
+                        kvState = state;
+                        state = STATE_QUOTING;
+                        break;
+                    case STATE_ESCAPING:
+                        stringBuilder.append(ch);
+                        state = kvState;
+                        break;
+                    case STATE_QUOTING:
+                        state = kvState;
+                        break;
+                }
+            } else {
+                stringBuilder.append(ch);
+            }
+        }
+
+        switch (state) {
+            case STATE_KEY:
+                throw new IllegalArgumentException("Dangling key.");
+            case STATE_VALUE:
+                value = stringBuilder.toString();
+                fields.put(key, value);
+                return fields;
+            case STATE_ESCAPING:
+                throw new IllegalArgumentException("Not closed escaping.");
+            case STATE_QUOTING:
+                throw new IllegalArgumentException("Not closed quoting.");
+            default:
+                throw new IllegalStateException();
+        }
+    }
+
     /**
      * formatDate
      *
@@ -106,4 +245,34 @@ public class StringUtil {
         return sdf.format(date);
     }
 
+    /**
+     * Parse date time form string format to unix format.
+     * Only support <b>yyyyMMdd</b>, <b>yyyyMMddHH</b> and <b>yyyyMMddHHmm</b> precision
+     * whose length is 8, 10 and 12 respectively.
+     *
+     * @param value Date time in string format.
+     * @return Unix date time.
+     */
+    public static long parseDateTime(String value) {
+        try {
+            if (value.length() < 8) {
+                return -1;
+            } else if (value.length() <= 9) {
+                SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMdd");
+                Date date = simpleDateFormat.parse(value.substring(0, 8));
+                return new Timestamp(date.getTime()).getTime();
+            } else if (value.length() <= 11) {
+                SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMddHH");
+                Date date = simpleDateFormat.parse(value.substring(0, 10));
+                return new Timestamp(date.getTime()).getTime();
+            } else {
+                SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMddHHmm");
+                Date date = simpleDateFormat.parse(value.substring(0, 12));
+                return new Timestamp(date.getTime()).getTime();
+            }
+        } catch (ParseException e) {
+            throw new IllegalArgumentException("Unexpected time format : " + value);
+        }
+    }
+
 }
diff --git a/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/decode/MessageDeserializerTest.java b/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/decode/MessageDeserializerTest.java
index 04307336a..815f1047c 100644
--- a/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/decode/MessageDeserializerTest.java
+++ b/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/decode/MessageDeserializerTest.java
@@ -24,6 +24,7 @@ import com.google.protobuf.ByteString;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import org.apache.inlong.common.msg.InLongMsg;
 import org.apache.inlong.sdk.commons.protocol.ProxySdk.MapFieldEntry;
 import org.apache.inlong.sdk.commons.protocol.ProxySdk.MessageObj;
 import org.apache.inlong.sdk.commons.protocol.ProxySdk.MessageObjs;
@@ -79,24 +80,27 @@ public class MessageDeserializerTest {
 
     @Test
     public void testDeserialize() {
-        //1. setUp
+        // 1. setUp
         try {
             setUp();
         } catch (Exception e) {
             e.printStackTrace();
         }
 
-        //2. testDeserializeVersion0
+        // 2. testDeserializeVersion0
         testDeserializeVersion0();
 
-        //3. testDeserializeVersion1CompressionType0
+        // 3. testDeserializeVersion1CompressionType0
         testDeserializeVersion1CompressionType0();
 
-        //4. testDeserializeVersion1CompressionType1
+        // 4. testDeserializeVersion1CompressionType1
         testDeserializeVersion1CompressionType1();
 
-        //5. testDeserializeVersion1CompressionType2
+        // 5. testDeserializeVersion1CompressionType2
         testDeserializeVersion1CompressionType2();
+
+        // 6. DeserializeVersion2NoCompress
+        testDeserializeVersion2NoCompress();
     }
 
     private void testDeserializeVersion0() {
@@ -165,6 +169,33 @@ public class MessageDeserializerTest {
         }
     }
 
+    private void testDeserializeVersion2NoCompress() {
+        try {
+            String groupId = "sort_sdk_test_group_id";
+            String streamId = "sort_sdk_test_stream_id";
+            String attr = "m=0";
+            String ip = "1.2.3.4";
+            long dt = System.currentTimeMillis();
+            StringBuilder newAttrBuilder = new StringBuilder(attr);
+            newAttrBuilder.append("&groupId=").append(groupId).append("&streamId=").append(streamId)
+                    .append("&dt=").append(dt).append("&NodeIP=").append(ip);
+            InLongMsg inlongMsg = InLongMsg.newInLongMsg(false);
+            String msg = "sort sdk inlong msg test";
+            for (int i = 0; i < 10; i++) {
+                byte[] bytes = msg.getBytes();
+                inlongMsg.addMsg(newAttrBuilder.toString(), bytes);
+            }
+            Map<String, String> header = new HashMap<>();
+            header.put("version", "2");
+            List<InLongMessage> deserialize = messageDeserializer
+                    .deserialize(context, inLongTopic, header, inlongMsg.buildArray());
+            Assert.assertEquals(10, deserialize.size());
+            Assert.assertEquals(msg, new String(deserialize.get(0).getBody()));
+        } catch (Throwable t) {
+            t.printStackTrace();
+        }
+    }
+
     private void prepareTestMessageObjs() {
         headers.put("version", "1");
         testData = "test data";
@@ -183,4 +214,4 @@ public class MessageDeserializerTest {
 
         messageObjs = MessageObjs.newBuilder().addMsgs(messageObj1).addMsgs(messageObj2).build();
     }
-}
\ No newline at end of file
+}