You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/06/21 15:30:48 UTC
[inlong] branch master updated: [INLONG-4730][Sort] Fix meta field format is null when parse json to sql (#4731)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 73c3671c2 [INLONG-4730][Sort] Fix meta field format is null when parse json to sql (#4731)
73c3671c2 is described below
commit 73c3671c24b67778eb6c330bda4b9d22896cd27d
Author: pacino <ge...@gmail.com>
AuthorDate: Tue Jun 21 23:30:42 2022 +0800
[INLONG-4730][Sort] Fix meta field format is null when parse json to sql (#4731)
---
.../inlong/sort/parser/impl/FlinkSqlParser.java | 2 +-
.../sort/parser/DataTypeConvertSqlParseTest.java | 39 +++++++++++++++++++---
.../inlong/sort/formats/base/TableFormatUtils.java | 6 ++--
3 files changed, 39 insertions(+), 8 deletions(-)
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
index f34cc098d..a38b2c2d2 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
@@ -483,7 +483,7 @@ public class FlinkSqlParser implements Parser {
&& outputField != null
&& outputField.getFormatInfo() != null
&& outputField.getFormatInfo().getTypeInfo().equals(formatInfo.getTypeInfo());
- if (sameType) {
+ if (sameType || field.getFormatInfo() == null) {
sb.append("\n ").append(inputField.format()).append(" AS ").append(field.format()).append(",");
} else {
String targetType = TableFormatUtils.deriveLogicalType(field.getFormatInfo()).asSummaryString();
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DataTypeConvertSqlParseTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DataTypeConvertSqlParseTest.java
index 0094614ec..60180ed68 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DataTypeConvertSqlParseTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DataTypeConvertSqlParseTest.java
@@ -21,6 +21,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.inlong.common.enums.MetaField;
import org.apache.inlong.sort.formats.common.IntFormatInfo;
import org.apache.inlong.sort.formats.common.LongFormatInfo;
import org.apache.inlong.sort.formats.common.StringFormatInfo;
@@ -28,12 +29,14 @@ import org.apache.inlong.sort.parser.impl.FlinkSqlParser;
import org.apache.inlong.sort.parser.result.ParseResult;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.GroupInfo;
+import org.apache.inlong.sort.protocol.MetaFieldInfo;
import org.apache.inlong.sort.protocol.StreamInfo;
import org.apache.inlong.sort.protocol.enums.KafkaScanStartupMode;
import org.apache.inlong.sort.protocol.node.Node;
import org.apache.inlong.sort.protocol.node.extract.KafkaExtractNode;
import org.apache.inlong.sort.protocol.node.format.CanalJsonFormat;
import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
+import org.apache.inlong.sort.protocol.node.transform.TransformNode;
import org.apache.inlong.sort.protocol.transformation.FieldRelation;
import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
import org.junit.Assert;
@@ -50,9 +53,11 @@ import java.util.stream.Collectors;
public class DataTypeConvertSqlParseTest extends AbstractTestBase {
private KafkaExtractNode buildKafkaExtractNode() {
+ MetaFieldInfo metaFieldInfo = new MetaFieldInfo("PROCESS_TIME", MetaField.PROCESS_TIME);
List<FieldInfo> fields = Arrays.asList(
new FieldInfo("id", new LongFormatInfo()),
- new FieldInfo("age", new IntFormatInfo())
+ new FieldInfo("age", new IntFormatInfo()),
+ metaFieldInfo
);
return new KafkaExtractNode("1", "kafka_input", fields, null,
null, "topic_input", "localhost:9092",
@@ -60,6 +65,27 @@ public class DataTypeConvertSqlParseTest extends AbstractTestBase {
null, "group_1");
}
+ /**
+ * Build a transform node
+ *
+ * @return A transform node
+ */
+ private Node buildTransformNode() {
+ return new TransformNode("4", "transform_node",
+ Arrays.asList(
+ new FieldInfo("id", new LongFormatInfo()),
+ new FieldInfo("age", new IntFormatInfo()),
+ new FieldInfo("PROCESS_TIME", null)
+ ), Arrays.asList(
+ new FieldRelation(new FieldInfo("id", "1", new LongFormatInfo()),
+ new FieldInfo("id", new LongFormatInfo())),
+ new FieldRelation(new FieldInfo("age", "2", new IntFormatInfo()),
+ new FieldInfo("age", new IntFormatInfo())),
+ new FieldRelation(new FieldInfo("PROCESS_TIME", "3", null),
+ new FieldInfo("PROCESS_TIME", null))
+ ), null, null);
+ }
+
private KafkaLoadNode buildKafkaLoadNode() {
List<FieldInfo> fields = Arrays.asList(
new FieldInfo("id", new StringFormatInfo()),
@@ -100,12 +126,15 @@ public class DataTypeConvertSqlParseTest extends AbstractTestBase {
env.enableCheckpointing(10000);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
Node inputNode = buildKafkaExtractNode();
+ Node transformNode = buildTransformNode();
Node outputNode = buildKafkaLoadNode();
StreamInfo streamInfo = new StreamInfo("1",
- Arrays.asList(inputNode, outputNode),
- Collections.singletonList(
- buildNodeRelation(Collections.singletonList(inputNode), Collections.singletonList(outputNode))
- ));
+ Arrays.asList(inputNode, transformNode, outputNode),
+ Arrays.asList(
+ buildNodeRelation(Collections.singletonList(inputNode),
+ Collections.singletonList(transformNode)),
+ buildNodeRelation(Collections.singletonList(transformNode),
+ Collections.singletonList(outputNode))));
GroupInfo groupInfo = new GroupInfo("1", Collections.singletonList(streamInfo));
FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv, groupInfo);
ParseResult result = parser.parse();
diff --git a/inlong-sort/sort-formats/format-base/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java b/inlong-sort/sort-formats/format-base/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java
index e0b572f59..ae5f4413e 100644
--- a/inlong-sort/sort-formats/format-base/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java
+++ b/inlong-sort/sort-formats/format-base/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java
@@ -316,7 +316,8 @@ public class TableFormatUtils {
} else if (logicalType instanceof NullType) {
return NullFormatInfo.INSTANCE;
} else {
- throw new UnsupportedOperationException();
+ throw new IllegalArgumentException(String.format("not found logicalType %s",
+ logicalType == null ? "null" : logicalType.toString()));
}
}
@@ -375,7 +376,8 @@ public class TableFormatUtils {
} else if (formatInfo instanceof NullFormatInfo) {
return new NullType();
} else {
- throw new UnsupportedOperationException();
+ throw new IllegalArgumentException(String.format("not found formatInfo %s",
+ formatInfo == null ? "null" : formatInfo.toString()));
}
}