You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/09/10 04:30:19 UTC
[inlong] branch master updated: [INLONG-5808][SDK] Sort SDK supports parse InlongMsg (#5809)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 4e4e8a36e [INLONG-5808][SDK] Sort SDK supports parse InlongMsg (#5809)
4e4e8a36e is described below
commit 4e4e8a36ea24285a9e2e5e11e759e4dca0cf0d95
Author: vernedeng <de...@pku.edu.cn>
AuthorDate: Sat Sep 10 12:30:13 2022 +0800
[INLONG-5808][SDK] Sort SDK supports parse InlongMsg (#5809)
---
.../sdk/sort/impl/decode/MessageDeserializer.java | 96 +++++++++++-
.../apache/inlong/sdk/sort/util/StringUtil.java | 169 +++++++++++++++++++++
.../sort/impl/decode/MessageDeserializerTest.java | 43 +++++-
3 files changed, 297 insertions(+), 11 deletions(-)
diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/decode/MessageDeserializer.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/decode/MessageDeserializer.java
index 145b7ad77..b0c54663e 100644
--- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/decode/MessageDeserializer.java
+++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/decode/MessageDeserializer.java
@@ -23,8 +23,13 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+import org.apache.inlong.common.msg.InLongMsg;
import org.apache.inlong.sdk.commons.protocol.ProxySdk.MapFieldEntry;
import org.apache.inlong.sdk.commons.protocol.ProxySdk.MessageObj;
import org.apache.inlong.sdk.commons.protocol.ProxySdk.MessageObjs;
@@ -32,12 +37,14 @@ import org.apache.inlong.sdk.sort.api.ClientContext;
import org.apache.inlong.sdk.sort.api.Deserializer;
import org.apache.inlong.sdk.sort.entity.InLongMessage;
import org.apache.inlong.sdk.sort.entity.InLongTopic;
+import org.apache.inlong.sdk.sort.util.StringUtil;
import org.apache.inlong.sdk.sort.util.Utils;
public class MessageDeserializer implements Deserializer {
private static final int MESSAGE_VERSION_NONE = 0;
private static final int MESSAGE_VERSION_PB = 1;
+ private static final int MESSAGE_VERSION_INLONG_MSG = 2;
private static final int COMPRESS_TYPE_NONE = 0;
private static final int COMPRESS_TYPE_GZIP = 1;
private static final int COMPRESS_TYPE_SNAPPY = 2;
@@ -48,15 +55,29 @@ public class MessageDeserializer implements Deserializer {
private static final String INLONG_GROUPID_KEY = "inlongGroupId";
private static final String INLONG_STREAMID_KEY = "inlongStreamId";
+ private static final String INLONGMSG_ATTR_STREAM_ID = "streamId";
+ private static final String INLONGMSG_ATTR_GROUP_ID = "groupId";
+ private static final String INLONGMSG_ATTR_TIME_T = "t";
+ private static final String INLONGMSG_ATTR_TIME_DT = "dt";
+ private static final String INLONGMSG_ATTR_NODE_IP = "NodeIP";
+ private static final char INLONGMSG_ATTR_ENTRY_DELIMITER = '&';
+ private static final char INLONGMSG_ATTR_KV_DELIMITER = '=';
+ private static final String DEFAULT_IP = "127.0.0.1";
+
+ private static final String PARSE_ATTR_ERROR_STRING = "Could not find %s in attributes!";
+
public MessageDeserializer() {
}
@Override
- public List<InLongMessage> deserialize(ClientContext context, InLongTopic inLongTopic, Map<String, String> headers,
+ public List<InLongMessage> deserialize(
+ ClientContext context,
+ InLongTopic inLongTopic,
+ Map<String, String> headers,
byte[] data) throws Exception {
//1. version
- int version = Integer.parseInt(headers.getOrDefault(VERSION_KEY, "0"));
+ int version = Integer.parseInt(headers.getOrDefault(VERSION_KEY, Integer.toString(MESSAGE_VERSION_INLONG_MSG)));
switch (version) {
case MESSAGE_VERSION_NONE: {
return decode(context, inLongTopic, data, headers);
@@ -64,12 +85,18 @@ public class MessageDeserializer implements Deserializer {
case MESSAGE_VERSION_PB: {
return decodePB(context, inLongTopic, data, headers);
}
+ case MESSAGE_VERSION_INLONG_MSG: {
+ return decodeInlongMsg(context, inLongTopic, data, headers);
+ }
default:
throw new IllegalArgumentException("Unknown version type:" + version);
}
}
- private List<InLongMessage> decode(ClientContext context, InLongTopic inLongTopic, byte[] msgBytes,
+ private List<InLongMessage> decode(
+ ClientContext context,
+ InLongTopic inLongTopic,
+ byte[] msgBytes,
Map<String, String> headers) {
long msgTime = Long.parseLong(headers.getOrDefault(MSG_TIME_KEY, "0"));
String sourceIp = headers.getOrDefault(SOURCE_IP_KEY, "");
@@ -89,7 +116,10 @@ public class MessageDeserializer implements Deserializer {
* @param msgBytes byte[]
* @return {@link MessageObjs}
*/
- private List<InLongMessage> decodePB(ClientContext context, InLongTopic inLongTopic, byte[] msgBytes,
+ private List<InLongMessage> decodePB(
+ ClientContext context,
+ InLongTopic inLongTopic,
+ byte[] msgBytes,
Map<String, String> headers) throws IOException {
int compressType = Integer.parseInt(headers.getOrDefault(COMPRESS_TYPE_KEY, "0"));
String inlongGroupId = headers.getOrDefault(INLONG_GROUPID_KEY, "");
@@ -120,7 +150,8 @@ public class MessageDeserializer implements Deserializer {
* @param messageObjs {@link MessageObjs}
* @return {@link List}
*/
- private List<InLongMessage> transformMessageObjs(ClientContext context, InLongTopic inLongTopic,
+ private List<InLongMessage> transformMessageObjs(
+ ClientContext context, InLongTopic inLongTopic,
MessageObjs messageObjs, String inlongGroupId,
String inlongStreamId) {
if (null == messageObjs) {
@@ -144,4 +175,59 @@ public class MessageDeserializer implements Deserializer {
}
return inLongMessages;
}
+
+ private List<InLongMessage> decodeInlongMsg(
+ ClientContext context,
+ InLongTopic inLongTopic,
+ byte[] msgBytes,
+ Map<String, String> headers) {
+ List<InLongMessage> messageList = new ArrayList<>();
+
+ InLongMsg inLongMsg = InLongMsg.parseFrom(msgBytes);
+ for (String attr : inLongMsg.getAttrs()) {
+ Map<String, String> attributes = StringUtil.splitKv(attr, INLONGMSG_ATTR_ENTRY_DELIMITER,
+ INLONGMSG_ATTR_KV_DELIMITER, null, null);
+
+ String groupId = Optional.ofNullable(attributes.get(INLONGMSG_ATTR_GROUP_ID))
+ .orElseThrow(() -> new IllegalArgumentException(String.format(PARSE_ATTR_ERROR_STRING,
+ INLONGMSG_ATTR_GROUP_ID)));
+
+ String streamId = Optional.ofNullable(attributes.get(INLONGMSG_ATTR_STREAM_ID))
+ .orElseThrow(() -> new IllegalArgumentException(String.format(PARSE_ATTR_ERROR_STRING,
+ INLONGMSG_ATTR_STREAM_ID)));
+
+ // Extracts time from the attributes
+ long msgTime;
+ if (attributes.containsKey(INLONGMSG_ATTR_TIME_T)) {
+ String date = attributes.get(INLONGMSG_ATTR_TIME_T).trim();
+ msgTime = StringUtil.parseDateTime(date);
+ } else if (attributes.containsKey(INLONGMSG_ATTR_TIME_DT)) {
+ String epoch = attributes.get(INLONGMSG_ATTR_TIME_DT).trim();
+ msgTime = Long.parseLong(epoch);
+ } else {
+ throw new IllegalArgumentException(String.format(PARSE_ATTR_ERROR_STRING,
+ INLONGMSG_ATTR_TIME_T + " or " + INLONGMSG_ATTR_TIME_DT));
+ }
+
+ String srcIp = Optional.ofNullable(attributes.get(INLONGMSG_ATTR_NODE_IP))
+ .orElse(DEFAULT_IP);
+
+ Iterator<byte[]> iterator = inLongMsg.getIterator(attr);
+ while (iterator.hasNext()) {
+ byte[] bodyBytes = iterator.next();
+ if (Objects.isNull(bodyBytes)) {
+ continue;
+ }
+ InLongMessage inLongMessage = new InLongMessage(groupId, streamId, msgTime,
+ srcIp, bodyBytes, attributes);
+ messageList.add(inLongMessage);
+ context.getStatManager()
+ .getStatistics(context.getConfig().getSortTaskId(),
+ inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
+ .addDecompressionConsumeSize(inLongMessage.getBody().length);
+ }
+ }
+ return messageList;
+ }
+
}
diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/util/StringUtil.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/util/StringUtil.java
index 6bacc8c22..2a8774255 100644
--- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/util/StringUtil.java
+++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/util/StringUtil.java
@@ -17,12 +17,151 @@
package org.apache.inlong.sdk.sort.util;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.sql.Timestamp;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
public class StringUtil {
+ private static final int STATE_NORMAL = 0;
+ private static final int STATE_KEY = 2;
+ private static final int STATE_VALUE = 4;
+ private static final int STATE_ESCAPING = 8;
+ private static final int STATE_QUOTING = 16;
+
+ /**
+ * Splits the kv text.
+ *
+ * <p>Both escaping and quoting is supported. When the escape character is
+ * not '\0', then the next character to the escape character will be
+ * escaped. When the quote character is not '\0', then all characters
+ * between consecutive quote characters will be escaped.</p>
+ *
+ * @param text The text to be split.
+ * @param entryDelimiter The delimiter of entries.
+ * @param kvDelimiter The delimiter between key and value.
+ * @param escapeChar The escaping character. Only valid if not '\0'.
+ * @param quoteChar The quoting character.
+ * @return The fields split from the text.
+ */
+ @SuppressWarnings("checkstyle:MissingSwitchDefault")
+ public static Map<String, String> splitKv(
+ @Nonnull String text,
+ @Nonnull Character entryDelimiter,
+ @Nonnull Character kvDelimiter,
+ @Nullable Character escapeChar,
+ @Nullable Character quoteChar
+ ) {
+ Map<String, String> fields = new HashMap<>();
+
+ StringBuilder stringBuilder = new StringBuilder();
+
+ String key = "";
+ String value;
+
+ int state = STATE_KEY;
+
+ /*
+ * The state when entering escaping and quoting. When we exit escaping
+ * or quoting, we should restore this state.
+ */
+ int kvState = STATE_KEY;
+
+ for (int i = 0; i < text.length(); ++i) {
+ char ch = text.charAt(i);
+
+ if (ch == kvDelimiter) {
+ switch (state) {
+ case STATE_KEY:
+ key = stringBuilder.toString();
+ stringBuilder.setLength(0);
+ state = STATE_VALUE;
+ break;
+ case STATE_VALUE:
+ throw new IllegalArgumentException("Unexpected token " + ch + " at position " + i + ".");
+ case STATE_ESCAPING:
+ stringBuilder.append(ch);
+ state = kvState;
+ break;
+ case STATE_QUOTING:
+ stringBuilder.append(ch);
+ break;
+ }
+ } else if (ch == entryDelimiter) {
+ switch (state) {
+ case STATE_KEY:
+ throw new IllegalArgumentException("Unexpected token " + ch + " at position " + i + ".");
+ case STATE_VALUE:
+ value = stringBuilder.toString();
+ fields.put(key, value);
+
+ stringBuilder.setLength(0);
+ state = STATE_KEY;
+ break;
+ case STATE_ESCAPING:
+ stringBuilder.append(ch);
+ state = kvState;
+ break;
+ case STATE_QUOTING:
+ stringBuilder.append(ch);
+ break;
+ }
+ } else if (escapeChar != null && ch == escapeChar) {
+ switch (state) {
+ case STATE_KEY:
+ case STATE_VALUE:
+ kvState = state;
+ state = STATE_ESCAPING;
+ break;
+ case STATE_ESCAPING:
+ stringBuilder.append(ch);
+ state = kvState;
+ break;
+ case STATE_QUOTING:
+ stringBuilder.append(ch);
+ break;
+ }
+ } else if (quoteChar != null && ch == quoteChar) {
+ switch (state) {
+ case STATE_KEY:
+ case STATE_VALUE:
+ kvState = state;
+ state = STATE_QUOTING;
+ break;
+ case STATE_ESCAPING:
+ stringBuilder.append(ch);
+ state = kvState;
+ break;
+ case STATE_QUOTING:
+ state = kvState;
+ break;
+ }
+ } else {
+ stringBuilder.append(ch);
+ }
+ }
+
+ switch (state) {
+ case STATE_KEY:
+ throw new IllegalArgumentException("Dangling key.");
+ case STATE_VALUE:
+ value = stringBuilder.toString();
+ fields.put(key, value);
+ return fields;
+ case STATE_ESCAPING:
+ throw new IllegalArgumentException("Not closed escaping.");
+ case STATE_QUOTING:
+ throw new IllegalArgumentException("Not closed quoting.");
+ default:
+ throw new IllegalStateException();
+ }
+ }
+
/**
* formatDate
*
@@ -106,4 +245,34 @@ public class StringUtil {
return sdf.format(date);
}
+ /**
+ * Parse date time form string format to unix format.
+ * Only support <b>yyyyMMdd</b>, <b>yyyyMMddHH</b> and <b>yyyyMMddHHmm</b> precision
+ * whose length is 8, 10 and 12 respectively.
+ *
+ * @param value Date time in string format.
+ * @return Unix date time.
+ */
+ public static long parseDateTime(String value) {
+ try {
+ if (value.length() < 8) {
+ return -1;
+ } else if (value.length() <= 9) {
+ SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMdd");
+ Date date = simpleDateFormat.parse(value.substring(0, 8));
+ return new Timestamp(date.getTime()).getTime();
+ } else if (value.length() <= 11) {
+ SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMddHH");
+ Date date = simpleDateFormat.parse(value.substring(0, 10));
+ return new Timestamp(date.getTime()).getTime();
+ } else {
+ SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMddHHmm");
+ Date date = simpleDateFormat.parse(value.substring(0, 12));
+ return new Timestamp(date.getTime()).getTime();
+ }
+ } catch (ParseException e) {
+ throw new IllegalArgumentException("Unexpected time format : " + value);
+ }
+ }
+
}
diff --git a/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/decode/MessageDeserializerTest.java b/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/decode/MessageDeserializerTest.java
index 04307336a..815f1047c 100644
--- a/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/decode/MessageDeserializerTest.java
+++ b/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/decode/MessageDeserializerTest.java
@@ -24,6 +24,7 @@ import com.google.protobuf.ByteString;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.inlong.common.msg.InLongMsg;
import org.apache.inlong.sdk.commons.protocol.ProxySdk.MapFieldEntry;
import org.apache.inlong.sdk.commons.protocol.ProxySdk.MessageObj;
import org.apache.inlong.sdk.commons.protocol.ProxySdk.MessageObjs;
@@ -79,24 +80,27 @@ public class MessageDeserializerTest {
@Test
public void testDeserialize() {
- //1. setUp
+ // 1. setUp
try {
setUp();
} catch (Exception e) {
e.printStackTrace();
}
- //2. testDeserializeVersion0
+ // 2. testDeserializeVersion0
testDeserializeVersion0();
- //3. testDeserializeVersion1CompressionType0
+ // 3. testDeserializeVersion1CompressionType0
testDeserializeVersion1CompressionType0();
- //4. testDeserializeVersion1CompressionType1
+ // 4. testDeserializeVersion1CompressionType1
testDeserializeVersion1CompressionType1();
- //5. testDeserializeVersion1CompressionType2
+ // 5. testDeserializeVersion1CompressionType2
testDeserializeVersion1CompressionType2();
+
+ // 6. DeserializeVersion2NoCompress
+ testDeserializeVersion2NoCompress();
}
private void testDeserializeVersion0() {
@@ -165,6 +169,33 @@ public class MessageDeserializerTest {
}
}
+ private void testDeserializeVersion2NoCompress() {
+ try {
+ String groupId = "sort_sdk_test_group_id";
+ String streamId = "sort_sdk_test_stream_id";
+ String attr = "m=0";
+ String ip = "1.2.3.4";
+ long dt = System.currentTimeMillis();
+ StringBuilder newAttrBuilder = new StringBuilder(attr);
+ newAttrBuilder.append("&groupId=").append(groupId).append("&streamId=").append(streamId)
+ .append("&dt=").append(dt).append("&NodeIP=").append(ip);
+ InLongMsg inlongMsg = InLongMsg.newInLongMsg(false);
+ String msg = "sort sdk inlong msg test";
+ for (int i = 0; i < 10; i++) {
+ byte[] bytes = msg.getBytes();
+ inlongMsg.addMsg(newAttrBuilder.toString(), bytes);
+ }
+ Map<String, String> header = new HashMap<>();
+ header.put("version", "2");
+ List<InLongMessage> deserialize = messageDeserializer
+ .deserialize(context, inLongTopic, header, inlongMsg.buildArray());
+ Assert.assertEquals(10, deserialize.size());
+ Assert.assertEquals(msg, new String(deserialize.get(0).getBody()));
+ } catch (Throwable t) {
+ t.printStackTrace();
+ }
+ }
+
private void prepareTestMessageObjs() {
headers.put("version", "1");
testData = "test data";
@@ -183,4 +214,4 @@ public class MessageDeserializerTest {
messageObjs = MessageObjs.newBuilder().addMsgs(messageObj1).addMsgs(messageObj2).build();
}
-}
\ No newline at end of file
+}