You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/06/21 15:30:48 UTC

[inlong] branch master updated: [INLONG-4730][Sort] Fix meta field format is null when parse json to sql (#4731)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 73c3671c2 [INLONG-4730][Sort] Fix meta field format is null when parse json to sql (#4731)
73c3671c2 is described below

commit 73c3671c24b67778eb6c330bda4b9d22896cd27d
Author: pacino <ge...@gmail.com>
AuthorDate: Tue Jun 21 23:30:42 2022 +0800

    [INLONG-4730][Sort] Fix meta field format is null when parse json to sql (#4731)
---
 .../inlong/sort/parser/impl/FlinkSqlParser.java    |  2 +-
 .../sort/parser/DataTypeConvertSqlParseTest.java   | 39 +++++++++++++++++++---
 .../inlong/sort/formats/base/TableFormatUtils.java |  6 ++--
 3 files changed, 39 insertions(+), 8 deletions(-)

diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
index f34cc098d..a38b2c2d2 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
@@ -483,7 +483,7 @@ public class FlinkSqlParser implements Parser {
                         && outputField != null
                         && outputField.getFormatInfo() != null
                         && outputField.getFormatInfo().getTypeInfo().equals(formatInfo.getTypeInfo());
-                if (sameType) {
+                if (sameType || field.getFormatInfo() == null) {
                     sb.append("\n    ").append(inputField.format()).append(" AS ").append(field.format()).append(",");
                 } else {
                     String targetType = TableFormatUtils.deriveLogicalType(field.getFormatInfo()).asSummaryString();
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DataTypeConvertSqlParseTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DataTypeConvertSqlParseTest.java
index 0094614ec..60180ed68 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DataTypeConvertSqlParseTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DataTypeConvertSqlParseTest.java
@@ -21,6 +21,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.inlong.common.enums.MetaField;
 import org.apache.inlong.sort.formats.common.IntFormatInfo;
 import org.apache.inlong.sort.formats.common.LongFormatInfo;
 import org.apache.inlong.sort.formats.common.StringFormatInfo;
@@ -28,12 +29,14 @@ import org.apache.inlong.sort.parser.impl.FlinkSqlParser;
 import org.apache.inlong.sort.parser.result.ParseResult;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.GroupInfo;
+import org.apache.inlong.sort.protocol.MetaFieldInfo;
 import org.apache.inlong.sort.protocol.StreamInfo;
 import org.apache.inlong.sort.protocol.enums.KafkaScanStartupMode;
 import org.apache.inlong.sort.protocol.node.Node;
 import org.apache.inlong.sort.protocol.node.extract.KafkaExtractNode;
 import org.apache.inlong.sort.protocol.node.format.CanalJsonFormat;
 import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
+import org.apache.inlong.sort.protocol.node.transform.TransformNode;
 import org.apache.inlong.sort.protocol.transformation.FieldRelation;
 import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
 import org.junit.Assert;
@@ -50,9 +53,11 @@ import java.util.stream.Collectors;
 public class DataTypeConvertSqlParseTest extends AbstractTestBase {
 
     private KafkaExtractNode buildKafkaExtractNode() {
+        MetaFieldInfo metaFieldInfo = new MetaFieldInfo("PROCESS_TIME", MetaField.PROCESS_TIME);
         List<FieldInfo> fields = Arrays.asList(
                 new FieldInfo("id", new LongFormatInfo()),
-                new FieldInfo("age", new IntFormatInfo())
+                new FieldInfo("age", new IntFormatInfo()),
+                metaFieldInfo
         );
         return new KafkaExtractNode("1", "kafka_input", fields, null,
                 null, "topic_input", "localhost:9092",
@@ -60,6 +65,27 @@ public class DataTypeConvertSqlParseTest extends AbstractTestBase {
                 null, "group_1");
     }
 
+    /**
+     * Build a transform node
+     *
+     * @return A transform node
+     */
+    private Node buildTransformNode() {
+        return new TransformNode("4", "transform_node",
+                Arrays.asList(
+                        new FieldInfo("id", new LongFormatInfo()),
+                        new FieldInfo("age", new IntFormatInfo()),
+                        new FieldInfo("PROCESS_TIME", null)
+                ), Arrays.asList(
+                new FieldRelation(new FieldInfo("id", "1", new LongFormatInfo()),
+                        new FieldInfo("id", new LongFormatInfo())),
+                new FieldRelation(new FieldInfo("age", "2", new IntFormatInfo()),
+                        new FieldInfo("age", new IntFormatInfo())),
+                new FieldRelation(new FieldInfo("PROCESS_TIME", "3", null),
+                        new FieldInfo("PROCESS_TIME", null))
+        ), null, null);
+    }
+
     private KafkaLoadNode buildKafkaLoadNode() {
         List<FieldInfo> fields = Arrays.asList(
                 new FieldInfo("id", new StringFormatInfo()),
@@ -100,12 +126,15 @@ public class DataTypeConvertSqlParseTest extends AbstractTestBase {
         env.enableCheckpointing(10000);
         StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
         Node inputNode = buildKafkaExtractNode();
+        Node transformNode = buildTransformNode();
         Node outputNode = buildKafkaLoadNode();
         StreamInfo streamInfo = new StreamInfo("1",
-                Arrays.asList(inputNode, outputNode),
-                Collections.singletonList(
-                        buildNodeRelation(Collections.singletonList(inputNode), Collections.singletonList(outputNode))
-                ));
+                Arrays.asList(inputNode, transformNode, outputNode),
+                Arrays.asList(
+                        buildNodeRelation(Collections.singletonList(inputNode),
+                                Collections.singletonList(transformNode)),
+                        buildNodeRelation(Collections.singletonList(transformNode),
+                                Collections.singletonList(outputNode))));
         GroupInfo groupInfo = new GroupInfo("1", Collections.singletonList(streamInfo));
         FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv, groupInfo);
         ParseResult result = parser.parse();
diff --git a/inlong-sort/sort-formats/format-base/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java b/inlong-sort/sort-formats/format-base/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java
index e0b572f59..ae5f4413e 100644
--- a/inlong-sort/sort-formats/format-base/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java
+++ b/inlong-sort/sort-formats/format-base/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java
@@ -316,7 +316,8 @@ public class TableFormatUtils {
         } else if (logicalType instanceof NullType) {
             return NullFormatInfo.INSTANCE;
         } else {
-            throw new UnsupportedOperationException();
+            throw new IllegalArgumentException(String.format("not found logicalType %s",
+                    logicalType == null ? "null" : logicalType.toString()));
         }
     }
 
@@ -375,7 +376,8 @@ public class TableFormatUtils {
         } else if (formatInfo instanceof NullFormatInfo) {
             return new NullType();
         } else {
-            throw new UnsupportedOperationException();
+            throw new IllegalArgumentException(String.format("not found formatInfo %s",
+                    formatInfo == null ? "null" : formatInfo.toString()));
         }
     }