You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/03/06 07:23:42 UTC

[incubator-inlong] branch master updated: [INLONG-2910][Sort] Fix deserialization error of BuiltInField json string (#2911)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 694a1c0  [INLONG-2910][Sort] Fix deserialization error of BuiltInField json string (#2911)
694a1c0 is described below

commit 694a1c08c3234d92d6307b3905aaf0a93e4f6de8
Author: Kevin Wen <ke...@gmail.com>
AuthorDate: Sun Mar 6 15:23:35 2022 +0800

    [INLONG-2910][Sort] Fix deserialization error of BuiltInField json string (#2911)
---
 .../org/apache/inlong/sort/protocol/FieldInfo.java | 10 ++++++++
 ...ieldInfoTest.java => BuiltInFieldInfoTest.java} | 29 +++++++++++++---------
 .../apache/inlong/sort/protocol/FieldInfoTest.java |  2 +-
 .../sort/protocol/kafka/KafkaSinkInfoTest.java     | 25 +++++++++++--------
 4 files changed, 42 insertions(+), 24 deletions(-)

diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/FieldInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/FieldInfo.java
index f8b1cb9..7417d8c 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/FieldInfo.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/FieldInfo.java
@@ -18,6 +18,8 @@
 package org.apache.inlong.sort.protocol;
 
 import com.google.common.base.Preconditions;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonSubTypes;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo;
 import org.apache.inlong.sort.formats.common.FormatInfo;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
@@ -25,6 +27,14 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonPro
 import java.io.Serializable;
 import java.util.Objects;
 
+@JsonTypeInfo(
+        use = JsonTypeInfo.Id.NAME,
+        include = JsonTypeInfo.As.PROPERTY,
+        property = "type")
+@JsonSubTypes({
+        @JsonSubTypes.Type(value = FieldInfo.class, name = "base"),
+        @JsonSubTypes.Type(value = BuiltInFieldInfo.class, name = "builtin")
+})
 public class FieldInfo implements Serializable {
 
     private static final long serialVersionUID = 5871970550803344673L;
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/FieldInfoTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/BuiltInFieldInfoTest.java
similarity index 52%
copy from inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/FieldInfoTest.java
copy to inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/BuiltInFieldInfoTest.java
index 4c086ec..2cc2267 100644
--- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/FieldInfoTest.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/BuiltInFieldInfoTest.java
@@ -17,19 +17,24 @@
 
 package org.apache.inlong.sort.protocol;
 
-import static org.junit.Assert.assertEquals;
-
 import org.apache.inlong.sort.formats.common.StringFormatInfo;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-import org.junit.Test;
 
-public class FieldInfoTest {
-    @Test
-    public void testSerialize() throws JsonProcessingException {
-        FieldInfo fieldInfo = new FieldInfo("field_name", StringFormatInfo.INSTANCE);
-        ObjectMapper objectMapper = new ObjectMapper();
-        String expected = "{\"name\":\"field_name\",\"format_info\":{\"type\":\"string\"}}";
-        assertEquals(expected, objectMapper.writeValueAsString(fieldInfo));
+public class BuiltInFieldInfoTest extends ProtocolBaseTest {
+
+    @Override
+    public void init() {
+        expectedObject = new BuiltInFieldInfo("f1", StringFormatInfo.INSTANCE, BuiltInFieldInfo.BuiltInField.DATA_TIME);
+        expectedJson = "{\n"
+                + "    \"type\":\"builtin\",\n"
+                + "    \"name\":\"f1\",\n"
+                + "    \"format_info\":{\n"
+                + "        \"type\":\"string\"\n"
+                + "    },\n"
+                + "    \"builtin_field\":\"DATA_TIME\"\n"
+                + "}";
+
+        equalObj1 = expectedObject;
+        equalObj2 = new BuiltInFieldInfo("f1", StringFormatInfo.INSTANCE, BuiltInFieldInfo.BuiltInField.DATA_TIME);
+        unequalObj = new BuiltInFieldInfo("f2", StringFormatInfo.INSTANCE, BuiltInFieldInfo.BuiltInField.DATA_TIME);
     }
 }
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/FieldInfoTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/FieldInfoTest.java
index 4c086ec..7d58721 100644
--- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/FieldInfoTest.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/FieldInfoTest.java
@@ -29,7 +29,7 @@ public class FieldInfoTest {
     public void testSerialize() throws JsonProcessingException {
         FieldInfo fieldInfo = new FieldInfo("field_name", StringFormatInfo.INSTANCE);
         ObjectMapper objectMapper = new ObjectMapper();
-        String expected = "{\"name\":\"field_name\",\"format_info\":{\"type\":\"string\"}}";
+        String expected = "{\"type\":\"base\",\"name\":\"field_name\",\"format_info\":{\"type\":\"string\"}}";
         assertEquals(expected, objectMapper.writeValueAsString(fieldInfo));
     }
 }
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/kafka/KafkaSinkInfoTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/kafka/KafkaSinkInfoTest.java
index f9b2fcc..dfaacb0 100644
--- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/kafka/KafkaSinkInfoTest.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/kafka/KafkaSinkInfoTest.java
@@ -36,18 +36,21 @@ public class KafkaSinkInfoTest extends ProtocolBaseTest {
         );
 
         expectedJson = "{\n"
-                + "  \"type\" : \"kafka\",\n"
-                + "  \"fields\" : [ {\n"
-                + "    \"name\" : \"field1\",\n"
-                + "    \"format_info\" : {\n"
-                + "      \"type\" : \"string\"\n"
+                + "    \"type\":\"kafka\",\n"
+                + "    \"fields\":[\n"
+                + "        {\n"
+                + "            \"type\":\"base\",\n"
+                + "            \"name\":\"field1\",\n"
+                + "            \"format_info\":{\n"
+                + "                \"type\":\"string\"\n"
+                + "            }\n"
+                + "        }\n"
+                + "    ],\n"
+                + "    \"address\":\"testAddress\",\n"
+                + "    \"topic\":\"testTopic\",\n"
+                + "    \"serialization_info\":{\n"
+                + "        \"type\":\"json\"\n"
                 + "    }\n"
-                + "  } ],\n"
-                + "  \"address\" : \"testAddress\",\n"
-                + "  \"topic\" : \"testTopic\",\n"
-                + "  \"serialization_info\" : {\n"
-                + "    \"type\" : \"json\"\n"
-                + "  }\n"
                 + "}";
 
         equalObj1 = expectedObject;