You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/10/28 07:16:03 UTC

[inlong] branch master updated: [INLONG-6311][Sort] Fix missing fields error for the old Canal JSON data in multiple sink scenes (#6312)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 27f8c9cff [INLONG-6311][Sort] Fix missing fields error for the old Canal JSON data in multiple sink scenes (#6312)
27f8c9cff is described below

commit 27f8c9cffe4372e3248de79001fe028adba74986
Author: thesumery <10...@users.noreply.github.com>
AuthorDate: Fri Oct 28 15:15:57 2022 +0800

    [INLONG-6311][Sort] Fix missing fields error for the old Canal JSON data in multiple sink scenes (#6312)
    
    Co-authored-by: thesumery <15...@qq.com>
---
 .../sort/base/format/JsonDynamicSchemaFormat.java  |  2 +-
 .../format/CanalJsonDynamicSchemaFormatTest.java   | 24 ++++++++++++++++++++++
 2 files changed, 25 insertions(+), 1 deletion(-)

diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java
index f49f01c0b..a0d564d0c 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java
@@ -99,7 +99,7 @@ public abstract class JsonDynamicSchemaFormat extends AbstractDynamicSchemaForma
 
     protected JsonDynamicSchemaFormat() {
         this.rowDataConverters =
-                new JsonToRowDataConverters(true, false, TimestampFormat.ISO_8601);
+                new JsonToRowDataConverters(false, false, TimestampFormat.ISO_8601);
     }
 
     /**
diff --git a/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/CanalJsonDynamicSchemaFormatTest.java b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/CanalJsonDynamicSchemaFormatTest.java
index ffa45e8c2..dc7ce5ec8 100644
--- a/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/CanalJsonDynamicSchemaFormatTest.java
+++ b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/CanalJsonDynamicSchemaFormatTest.java
@@ -18,11 +18,16 @@
 package org.apache.inlong.sort.base.format;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryStringData;
+import org.apache.flink.types.RowKind;
 import org.junit.Assert;
 import org.junit.Test;
 
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -96,6 +101,25 @@ public class CanalJsonDynamicSchemaFormatTest extends DynamicSchemaFormatBaseTes
         Assert.assertEquals(values, Collections.singletonList("111"));
     }
 
+    @Test
+    public void testExtractRowData() throws IOException {
+        JsonNode rootNode = (JsonNode) getDynamicSchemaFormat()
+                .deserialize(getSource().getBytes(StandardCharsets.UTF_8));
+        List<RowData> values = getDynamicSchemaFormat().extractRowData(rootNode);
+        List<RowData> rowDataList = new ArrayList<>();
+        rowDataList.add(GenericRowData.ofKind(RowKind.UPDATE_BEFORE,
+                111,
+                BinaryStringData.fromString("scooter"),
+                BinaryStringData.fromString("Big 2-wheel scooter"),
+                5.15f));
+        rowDataList.add(GenericRowData.ofKind(RowKind.UPDATE_AFTER,
+                111,
+                BinaryStringData.fromString("scooter"),
+                BinaryStringData.fromString("Big 2-wheel scooter"),
+                5.18f));
+        Assert.assertEquals(values, rowDataList);
+    }
+
     @SuppressWarnings({"unchecked", "rawtypes"})
     @Override
     protected AbstractDynamicSchemaFormat getDynamicSchemaFormat() {