You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/10/28 07:16:03 UTC
[inlong] branch master updated: [INLONG-6311][Sort] Fix missing fields error for the old Canal JSON data in multiple sink scenes (#6312)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 27f8c9cff [INLONG-6311][Sort] Fix missing fields error for the old Canal JSON data in multiple sink scenes (#6312)
27f8c9cff is described below
commit 27f8c9cffe4372e3248de79001fe028adba74986
Author: thesumery <10...@users.noreply.github.com>
AuthorDate: Fri Oct 28 15:15:57 2022 +0800
[INLONG-6311][Sort] Fix missing fields error for the old Canal JSON data in multiple sink scenes (#6312)
Co-authored-by: thesumery <15...@qq.com>
---
.../sort/base/format/JsonDynamicSchemaFormat.java | 2 +-
.../format/CanalJsonDynamicSchemaFormatTest.java | 24 ++++++++++++++++++++++
2 files changed, 25 insertions(+), 1 deletion(-)
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java
index f49f01c0b..a0d564d0c 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java
@@ -99,7 +99,7 @@ public abstract class JsonDynamicSchemaFormat extends AbstractDynamicSchemaForma
protected JsonDynamicSchemaFormat() {
this.rowDataConverters =
- new JsonToRowDataConverters(true, false, TimestampFormat.ISO_8601);
+ new JsonToRowDataConverters(false, false, TimestampFormat.ISO_8601);
}
/**
diff --git a/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/CanalJsonDynamicSchemaFormatTest.java b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/CanalJsonDynamicSchemaFormatTest.java
index ffa45e8c2..dc7ce5ec8 100644
--- a/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/CanalJsonDynamicSchemaFormatTest.java
+++ b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/CanalJsonDynamicSchemaFormatTest.java
@@ -18,11 +18,16 @@
package org.apache.inlong.sort.base.format;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryStringData;
+import org.apache.flink.types.RowKind;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -96,6 +101,25 @@ public class CanalJsonDynamicSchemaFormatTest extends DynamicSchemaFormatBaseTes
Assert.assertEquals(values, Collections.singletonList("111"));
}
+ @Test
+ public void testExtractRowData() throws IOException {
+ JsonNode rootNode = (JsonNode) getDynamicSchemaFormat()
+ .deserialize(getSource().getBytes(StandardCharsets.UTF_8));
+ List<RowData> values = getDynamicSchemaFormat().extractRowData(rootNode);
+ List<RowData> rowDataList = new ArrayList<>();
+ rowDataList.add(GenericRowData.ofKind(RowKind.UPDATE_BEFORE,
+ 111,
+ BinaryStringData.fromString("scooter"),
+ BinaryStringData.fromString("Big 2-wheel scooter"),
+ 5.15f));
+ rowDataList.add(GenericRowData.ofKind(RowKind.UPDATE_AFTER,
+ 111,
+ BinaryStringData.fromString("scooter"),
+ BinaryStringData.fromString("Big 2-wheel scooter"),
+ 5.18f));
+ Assert.assertEquals(values, rowDataList);
+ }
+
@SuppressWarnings({"unchecked", "rawtypes"})
@Override
protected AbstractDynamicSchemaFormat getDynamicSchemaFormat() {