You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2021/07/09 11:23:37 UTC
[incubator-inlong] branch master updated: [INLONG-699] Fix
serialization issue of DeserializationInfo 's subType in inlong-sort
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new ddb8f2e [INLONG-699] Fix serialization issue of DeserializationInfo 's subType in inlong-sort
ddb8f2e is described below
commit ddb8f2e031a0bf974fa6c7b3645a5c889454f483
Author: tianqiwan <ti...@tencent.com>
AuthorDate: Fri Jul 9 19:10:03 2021 +0800
[INLONG-699] Fix serialization issue of DeserializationInfo 's subType in inlong-sort
---
.../protocol/deserialization/DeserializationInfo.java | 5 +++++
.../deserialization/TDMsgDeserializationInfo.java | 16 +---------------
.../inlong/sort/protocol/DeserializationInfoTest.java | 19 +++++++++++++++++++
3 files changed, 25 insertions(+), 15 deletions(-)
diff --git a/inlong-sort/common/src/main/java/org/apache/inlong/sort/protocol/deserialization/DeserializationInfo.java b/inlong-sort/common/src/main/java/org/apache/inlong/sort/protocol/deserialization/DeserializationInfo.java
index 187d4da..f911311 100644
--- a/inlong-sort/common/src/main/java/org/apache/inlong/sort/protocol/deserialization/DeserializationInfo.java
+++ b/inlong-sort/common/src/main/java/org/apache/inlong/sort/protocol/deserialization/DeserializationInfo.java
@@ -32,6 +32,11 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTyp
property = "type")
@JsonSubTypes({
@Type(value = CsvDeserializationInfo.class, name = "csv"),
+ @Type(value = TDMsgCsvDeserializationInfo.class, name = "tdmsg_csv"),
+ @Type(value = TDMsgCsv2DeserializationInfo.class, name = "tdmsg_csv2"),
+ @Type(value = TDMsgKvDeserializationInfo.class, name = "tdmsg_kv"),
+ @Type(value = TDMsgTlogCsvDeserializationInfo.class, name = "tdmsg_tlog_csv"),
+ @Type(value = TDMsgTlogKvDeserializationInfo.class, name = "tdmsg_tlog_kv")
})
public interface DeserializationInfo extends Serializable {
diff --git a/inlong-sort/common/src/main/java/org/apache/inlong/sort/protocol/deserialization/TDMsgDeserializationInfo.java b/inlong-sort/common/src/main/java/org/apache/inlong/sort/protocol/deserialization/TDMsgDeserializationInfo.java
index f00a7d1..1b36161 100644
--- a/inlong-sort/common/src/main/java/org/apache/inlong/sort/protocol/deserialization/TDMsgDeserializationInfo.java
+++ b/inlong-sort/common/src/main/java/org/apache/inlong/sort/protocol/deserialization/TDMsgDeserializationInfo.java
@@ -20,24 +20,10 @@ package org.apache.inlong.sort.protocol.deserialization;
import static com.google.common.base.Preconditions.checkNotNull;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonSubTypes;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonSubTypes.Type;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo;
/**
- * .
+ * TDMsgDeserializationInfo.
*/
-@JsonTypeInfo(
- use = JsonTypeInfo.Id.NAME,
- include = JsonTypeInfo.As.PROPERTY,
- property = "type")
-@JsonSubTypes({
- @Type(value = TDMsgCsvDeserializationInfo.class, name = "tdmsg_csv"),
- @Type(value = TDMsgCsv2DeserializationInfo.class, name = "tdmsg_csv2"),
- @Type(value = TDMsgKvDeserializationInfo.class, name = "tdmsg_kv"),
- @Type(value = TDMsgTlogCsvDeserializationInfo.class, name = "tdmsg_tlog_csv"),
- @Type(value = TDMsgTlogKvDeserializationInfo.class, name = "tdmsg_tlog_kv"),
-})
public abstract class TDMsgDeserializationInfo implements DeserializationInfo {
private static final long serialVersionUID = 3707412713264864315L;
diff --git a/inlong-sort/common/src/test/java/org/apache/inlong/sort/protocol/DeserializationInfoTest.java b/inlong-sort/common/src/test/java/org/apache/inlong/sort/protocol/DeserializationInfoTest.java
index a09da7f..5959752 100644
--- a/inlong-sort/common/src/test/java/org/apache/inlong/sort/protocol/DeserializationInfoTest.java
+++ b/inlong-sort/common/src/test/java/org/apache/inlong/sort/protocol/DeserializationInfoTest.java
@@ -19,9 +19,14 @@ package org.apache.inlong.sort.protocol;
import static org.junit.Assert.assertEquals;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.inlong.sort.protocol.deserialization.CsvDeserializationInfo;
import java.io.IOException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.inlong.sort.protocol.deserialization.TDMsgCsvDeserializationInfo;
+import org.apache.inlong.sort.protocol.sink.ClickHouseSinkInfo;
+import org.apache.inlong.sort.protocol.sink.ClickHouseSinkInfo.PartitionStrategy;
+import org.apache.inlong.sort.protocol.source.TubeSourceInfo;
import org.junit.Test;
public class DeserializationInfoTest {
@@ -33,4 +38,18 @@ public class DeserializationInfoTest {
str.getBytes(), CsvDeserializationInfo.class);
assertEquals('&', deserializationInfo.getSplitter());
}
+
+ @Test
+ public void testToJson() throws JsonProcessingException {
+ DataFlowInfo dataFlowInfo = new DataFlowInfo(
+ 1,
+ new TubeSourceInfo("topic" + System.currentTimeMillis(), "ma", "cg",
+ new TDMsgCsvDeserializationInfo("tid", ','), new FieldInfo[0]),
+ new ClickHouseSinkInfo("url", "dn", "tn", "un", "pw",
+ false, PartitionStrategy.HASH, "pk", new FieldInfo[0], new String[0],
+ 100, 100, 100));
+ ObjectMapper objectMapper = new ObjectMapper();
+
+ System.out.println(objectMapper.writeValueAsString(dataFlowInfo));
+ }
}