You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/08/19 08:42:47 UTC
[inlong] branch master updated: [INLONG-5599][Sort] Add temporal join support for MySQL (#5600)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 5cfd05b53 [INLONG-5599][Sort] Add temporal join support for MySQL (#5600)
5cfd05b53 is described below
commit 5cfd05b5393de6789c6e7ebb3e3a27fdab5381ba
Author: Charles <44...@users.noreply.github.com>
AuthorDate: Fri Aug 19 16:42:42 2022 +0800
[INLONG-5599][Sort] Add temporal join support for MySQL (#5600)
---
.../protocol/node/extract/MySqlExtractNode.java | 164 ++++++++++--
.../protocol/transformation/WatermarkField.java | 16 +-
...java => InnerTemporalJoinRelationRelation.java} | 8 +-
.../transformation/relation/JoinRelation.java | 4 +-
... => LeftOuterTemporalJoinRelationRelation.java} | 4 +-
.../transformation/relation/NodeRelation.java | 4 +-
...TemporalJoin.java => TemporalJoinRelation.java} | 16 +-
.../inlong/sort/parser/impl/FlinkSqlParser.java | 8 +-
.../MySqlTemporalJoinRelationSqlParseTest.java | 285 +++++++++++++++++++++
... => RedisTemporalJoinRelationSqlParseTest.java} | 15 +-
10 files changed, 471 insertions(+), 53 deletions(-)
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/MySqlExtractNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/MySqlExtractNode.java
index 750b9b6e6..b1d0f2783 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/MySqlExtractNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/MySqlExtractNode.java
@@ -30,6 +30,7 @@ import org.apache.inlong.common.enums.MetaField;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.InlongMetric;
import org.apache.inlong.sort.protocol.Metadata;
+import org.apache.inlong.sort.protocol.enums.ExtractMode;
import org.apache.inlong.sort.protocol.node.ExtractNode;
import org.apache.inlong.sort.protocol.transformation.WatermarkField;
@@ -77,35 +78,137 @@ public class MySqlExtractNode extends ExtractNode implements Metadata, InlongMet
@JsonInclude(Include.NON_NULL)
@JsonProperty("serverTimeZone")
private String serverTimeZone;
+ @Nonnull
+ @JsonProperty("extractMode")
+ private ExtractMode extractMode;
+ @JsonProperty("url")
+ private String url;
+
+ /**
+ * Constructor only used for {@link ExtractMode#CDC}
+ *
+ * @param id The id of node
+ * @param name The name of node
+ * @param fields The field list of node
+ * @param watermarkField The watermark field of node only used for {@link ExtractMode#CDC}
+ * @param properties The properties connect to mysql
+ * @param primaryKey The primark key connect to mysql
+ * @param tableNames The table names connect to mysql
+ * @param hostname The hostname connect to mysql only used for {@link ExtractMode#CDC}
+ * @param username The username connect to mysql
+ * @param password The password connect to mysql
+ * @param database The database connect to mysql only used for {@link ExtractMode#CDC}
+ * @param port The port connect to mysql only used for {@link ExtractMode#CDC}
+ * @param serverId The server id connect to mysql only used for {@link ExtractMode#CDC}
+ * @param incrementalSnapshotEnabled The incremental snapshot enabled connect to mysql
+ * only used for {@link ExtractMode#CDC}
+ * @param serverTimeZone The server time zone connect to mysql only used for {@link ExtractMode#CDC}
+ */
+ public MySqlExtractNode(@Nonnull @JsonProperty("id") String id,
+ @Nonnull @JsonProperty("name") String name,
+ @Nonnull @JsonProperty("fields") List<FieldInfo> fields,
+ @Nullable @JsonProperty("watermarkField") WatermarkField watermarkField,
+ @Nullable @JsonProperty("properties") Map<String, String> properties,
+ @Nullable @JsonProperty("primaryKey") String primaryKey,
+ @Nonnull @JsonProperty("tableNames") List<String> tableNames,
+ @Nonnull @JsonProperty("hostname") String hostname,
+ @Nonnull @JsonProperty("username") String username,
+ @Nonnull @JsonProperty("password") String password,
+ @Nonnull @JsonProperty("database") String database,
+ @Nullable @JsonProperty("port") Integer port,
+ @Nullable @JsonProperty("serverId") Integer serverId,
+ @Nullable @JsonProperty("incrementalSnapshotEnabled") Boolean incrementalSnapshotEnabled,
+ @Nullable @JsonProperty("serverTimeZone") String serverTimeZone) {
+ this(id, name, fields, watermarkField, properties, primaryKey, tableNames, hostname, username, password,
+ database, port, serverId, incrementalSnapshotEnabled, serverTimeZone, ExtractMode.CDC, null);
+ }
+ /**
+ * Constructor only used for {@link ExtractMode#SCAN}
+ *
+ * @param id The id of node
+ * @param name The name of node
+ * @param fields The field list of node
+ * @param properties The properties connect to mysql
+ * @param primaryKey The primark key connect to mysql
+ * @param tableNames The table names connect to mysql
+ * @param username The username connect to mysql
+ * @param password The password connect to mysql
+ * @param url The url connect to mysql only used for {@link ExtractMode#SCAN}
+ */
+ public MySqlExtractNode(@JsonProperty("id") String id,
+ @JsonProperty("name") String name,
+ @JsonProperty("fields") List<FieldInfo> fields,
+ @Nullable @JsonProperty("properties") Map<String, String> properties,
+ @Nullable @JsonProperty("primaryKey") String primaryKey,
+ @JsonProperty("tableNames") @Nonnull List<String> tableNames,
+ @JsonProperty("username") String username,
+ @JsonProperty("password") String password,
+ @Nullable @JsonProperty("url") String url) {
+ this(id, name, fields, null, properties, primaryKey, tableNames, null, username,
+ password, null, null, null, null, null,
+ ExtractMode.SCAN, url);
+ }
+
+ /**
+ * Base constructor
+ *
+ * @param id The id of node
+ * @param name The name of node
+ * @param fields The field list of node
+ * @param watermarkField The watermark field of node only used for {@link ExtractMode#CDC}
+ * @param properties The properties connect to mysql
+ * @param primaryKey The primark key connect to mysql
+ * @param tableNames The table names connect to mysql
+ * @param hostname The hostname connect to mysql only used for {@link ExtractMode#CDC}
+ * @param username The username connect to mysql
+ * @param password The password connect to mysql
+ * @param database The database connect to mysql only used for {@link ExtractMode#CDC}
+ * @param port The port connect to mysql only used for {@link ExtractMode#CDC}
+ * @param serverId The server id connect to mysql only used for {@link ExtractMode#CDC}
+ * @param incrementalSnapshotEnabled The incremental snapshot enabled connect to mysql
+ * only used for {@link ExtractMode#CDC}
+ * @param serverTimeZone The server time zone connect to mysql only used for {@link ExtractMode#CDC}
+ * @param extractMode The extract mode connect mysql {@link ExtractMode}
+ * @param url The url connect to mysql only used for {@link ExtractMode#SCAN}
+ */
@JsonCreator
public MySqlExtractNode(@JsonProperty("id") String id,
@JsonProperty("name") String name,
@JsonProperty("fields") List<FieldInfo> fields,
- @Nullable @JsonProperty("watermarkField") WatermarkField waterMarkField,
- @JsonProperty("properties") Map<String, String> properties,
- @JsonProperty("primaryKey") String primaryKey,
+ @Nullable @JsonProperty("watermarkField") WatermarkField watermarkField,
+ @Nullable @JsonProperty("properties") Map<String, String> properties,
+ @Nullable @JsonProperty("primaryKey") String primaryKey,
@JsonProperty("tableNames") @Nonnull List<String> tableNames,
- @JsonProperty("hostname") String hostname,
+ @Nullable @JsonProperty("hostname") String hostname,
@JsonProperty("username") String username,
@JsonProperty("password") String password,
- @JsonProperty("database") String database,
- @JsonProperty("port") Integer port,
- @JsonProperty("serverId") Integer serverId,
- @JsonProperty("incrementalSnapshotEnabled") Boolean incrementalSnapshotEnabled,
- @JsonProperty("serverTimeZone") String serverTimeZone) {
- super(id, name, fields, waterMarkField, properties);
+ @Nullable @JsonProperty("database") String database,
+ @Nullable @JsonProperty("port") Integer port,
+ @Nullable @JsonProperty("serverId") Integer serverId,
+ @Nullable @JsonProperty("incrementalSnapshotEnabled") Boolean incrementalSnapshotEnabled,
+ @Nullable @JsonProperty("serverTimeZone") String serverTimeZone,
+ @Nonnull @JsonProperty("extractMode") ExtractMode extractMode,
+ @Nullable @JsonProperty("url") String url) {
+ super(id, name, fields, watermarkField, properties);
this.tableNames = Preconditions.checkNotNull(tableNames, "tableNames is null");
Preconditions.checkState(!tableNames.isEmpty(), "tableNames is empty");
- this.hostname = Preconditions.checkNotNull(hostname, "hostname is null");
+ if (extractMode == ExtractMode.CDC) {
+ this.hostname = Preconditions.checkNotNull(hostname, "hostname is null");
+ this.database = Preconditions.checkNotNull(database, "database is null");
+ } else {
+ this.hostname = hostname;
+ this.database = database;
+ this.url = Preconditions.checkNotNull(url, "url is null");
+ }
this.username = Preconditions.checkNotNull(username, "username is null");
this.password = Preconditions.checkNotNull(password, "password is null");
- this.database = Preconditions.checkNotNull(database, "database is null");
this.primaryKey = primaryKey;
this.port = port;
this.serverId = serverId;
this.incrementalSnapshotEnabled = incrementalSnapshotEnabled;
this.serverTimeZone = serverTimeZone;
+ this.extractMode = extractMode;
}
@Override
@@ -121,23 +224,30 @@ public class MySqlExtractNode extends ExtractNode implements Metadata, InlongMet
@Override
public Map<String, String> tableOptions() {
Map<String, String> options = super.tableOptions();
- options.put("connector", "mysql-cdc-inlong");
- options.put("hostname", hostname);
+ if (extractMode == ExtractMode.CDC) {
+ options.put("connector", "mysql-cdc-inlong");
+ options.put("hostname", hostname);
+ options.put("database-name", database);
+ if (port != null) {
+ options.put("port", port.toString());
+ }
+ if (serverId != null) {
+ options.put("server-id", serverId.toString());
+ }
+ if (incrementalSnapshotEnabled != null) {
+ options.put("scan.incremental.snapshot.enabled", incrementalSnapshotEnabled.toString());
+ }
+ if (serverTimeZone != null) {
+ options.put("server-time-zone", serverTimeZone);
+ }
+ } else {
+ options.put("connector", "jdbc-inlong");
+ options.put("url", url);
+ Preconditions.checkState(tableNames.size() == 1,
+ "Only support one table for scan extract mode");
+ }
options.put("username", username);
options.put("password", password);
- options.put("database-name", database);
- if (port != null) {
- options.put("port", port.toString());
- }
- if (serverId != null) {
- options.put("server-id", serverId.toString());
- }
- if (incrementalSnapshotEnabled != null) {
- options.put("scan.incremental.snapshot.enabled", incrementalSnapshotEnabled.toString());
- }
- if (serverTimeZone != null) {
- options.put("server-time-zone", serverTimeZone);
- }
String formatTable = tableNames.size() == 1 ? tableNames.get(0) :
String.format("(%s)", StringUtils.join(tableNames, "|"));
options.put("table-name", String.format("%s", formatTable));
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/WatermarkField.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/WatermarkField.java
index a36ed79bc..d8949c140 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/WatermarkField.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/WatermarkField.java
@@ -24,6 +24,7 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCre
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam.TimeUnit;
import java.util.Arrays;
import java.util.List;
@@ -48,12 +49,23 @@ public class WatermarkField implements TimeWindowFunction {
@JsonProperty("interval") StringConstantParam interval,
@JsonProperty("timeUnit") TimeUnitConstantParam timeUnit) {
this.timeAttr = Preconditions.checkNotNull(timeAttr, "timeAttr is null");
- this.interval = Preconditions.checkNotNull(interval, "interval is null");
- this.timeUnit = Preconditions.checkNotNull(timeUnit, "timeUnit is null");
+ this.interval = interval;
+ this.timeUnit = timeUnit;
+ }
+
+ public WatermarkField(@JsonProperty("timeAttr") FieldInfo timeAttr) {
+ this(timeAttr, null, null);
}
@Override
public String format() {
+ if (interval == null) {
+ return String.format("%s FOR %s AS %s", getName(), timeAttr.format(), timeAttr.format());
+ }
+ if (timeUnit == null) {
+ return String.format("%s FOR %s AS %s - INTERVAL %s %s", getName(), timeAttr.format(),
+ timeAttr.format(), interval.format(), TimeUnit.SECOND.name());
+ }
return String.format("%s FOR %s AS %s - INTERVAL %s %s", getName(), timeAttr.format(),
timeAttr.format(), interval.format(), timeUnit.format());
}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/InnerTemporalJoin.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/InnerTemporalJoinRelationRelation.java
similarity index 93%
rename from inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/InnerTemporalJoin.java
rename to inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/InnerTemporalJoinRelationRelation.java
index 9cd4fafa0..2b9b5782d 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/InnerTemporalJoin.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/InnerTemporalJoinRelationRelation.java
@@ -31,16 +31,16 @@ import java.util.List;
import java.util.Map;
/**
- * Inner temporal join
+ * Inner temporal join relation
*/
@JsonTypeName("innerTemporalJoin")
@EqualsAndHashCode(callSuper = true)
@Data
@NoArgsConstructor
-public class InnerTemporalJoin extends TemporalJoin {
+public class InnerTemporalJoinRelationRelation extends TemporalJoinRelation {
/**
- * LeftOuterTemporalJoin Constructor
+ * Constructor
*
* @param inputs The inputs is a list of input node id
* @param outputs The outputs is a list of output node id
@@ -50,7 +50,7 @@ public class InnerTemporalJoin extends TemporalJoin {
* @param systemTime The system time for temporal join
*/
@JsonCreator
- public InnerTemporalJoin(
+ public InnerTemporalJoinRelationRelation(
@JsonProperty("inputs") List<String> inputs,
@JsonProperty("outputs") List<String> outputs,
@JsonProperty("joinConditionMap") Map<String, List<FilterFunction>> joinConditionMap,
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/JoinRelation.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/JoinRelation.java
index 9fd7687f1..c98e2be30 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/JoinRelation.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/JoinRelation.java
@@ -41,8 +41,8 @@ import java.util.Map;
@JsonSubTypes.Type(value = InnerJoinNodeRelation.class, name = "innerJoin"),
@JsonSubTypes.Type(value = LeftOuterJoinNodeRelation.class, name = "leftOuterJoin"),
@JsonSubTypes.Type(value = RightOuterJoinNodeRelation.class, name = "rightOutJoin"),
- @JsonSubTypes.Type(value = InnerTemporalJoin.class, name = "innerTemporalJoin"),
- @JsonSubTypes.Type(value = LeftOuterTemporalJoin.class, name = "leftOuterTemporalJoin")
+ @JsonSubTypes.Type(value = InnerTemporalJoinRelationRelation.class, name = "innerTemporalJoin"),
+ @JsonSubTypes.Type(value = LeftOuterTemporalJoinRelationRelation.class, name = "leftOuterTemporalJoin")
})
@EqualsAndHashCode(callSuper = true)
@Data
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/LeftOuterTemporalJoin.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/LeftOuterTemporalJoinRelationRelation.java
similarity index 94%
rename from inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/LeftOuterTemporalJoin.java
rename to inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/LeftOuterTemporalJoinRelationRelation.java
index fcf4893f9..2a873381e 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/LeftOuterTemporalJoin.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/LeftOuterTemporalJoinRelationRelation.java
@@ -37,7 +37,7 @@ import java.util.Map;
@EqualsAndHashCode(callSuper = true)
@Data
@NoArgsConstructor
-public class LeftOuterTemporalJoin extends TemporalJoin {
+public class LeftOuterTemporalJoinRelationRelation extends TemporalJoinRelation {
/**
* LeftOuterTemporalJoin Constructor
@@ -50,7 +50,7 @@ public class LeftOuterTemporalJoin extends TemporalJoin {
* @param systemTime The system time for temporal join
*/
@JsonCreator
- public LeftOuterTemporalJoin(
+ public LeftOuterTemporalJoinRelationRelation(
@JsonProperty("inputs") List<String> inputs,
@JsonProperty("outputs") List<String> outputs,
@JsonProperty("joinConditionMap") Map<String, List<FilterFunction>> joinConditionMap,
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/NodeRelation.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/NodeRelation.java
index 5e3e6c0eb..43a3e725c 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/NodeRelation.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/NodeRelation.java
@@ -40,8 +40,8 @@ import java.util.List;
@JsonSubTypes.Type(value = InnerJoinNodeRelation.class, name = "innerJoin"),
@JsonSubTypes.Type(value = LeftOuterJoinNodeRelation.class, name = "leftOuterJoin"),
@JsonSubTypes.Type(value = RightOuterJoinNodeRelation.class, name = "rightOutJoin"),
- @JsonSubTypes.Type(value = InnerTemporalJoin.class, name = "innerTemporalJoin"),
- @JsonSubTypes.Type(value = LeftOuterTemporalJoin.class, name = "leftOuterTemporalJoin"),
+ @JsonSubTypes.Type(value = InnerTemporalJoinRelationRelation.class, name = "innerTemporalJoin"),
+ @JsonSubTypes.Type(value = LeftOuterTemporalJoinRelationRelation.class, name = "leftOuterTemporalJoin"),
@JsonSubTypes.Type(value = UnionNodeRelation.class, name = "union"),
@JsonSubTypes.Type(value = NodeRelation.class, name = "baseRelation")
})
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/TemporalJoin.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/TemporalJoinRelation.java
similarity index 78%
rename from inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/TemporalJoin.java
rename to inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/TemporalJoinRelation.java
index c9c7a71d2..6a4fc1725 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/TemporalJoin.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/TemporalJoinRelation.java
@@ -23,6 +23,8 @@ import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonSubTypes;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.transformation.FilterFunction;
@@ -31,12 +33,20 @@ import java.util.List;
import java.util.Map;
/**
- * Temporal join base class
+ * Temporal join relation base class
*/
+@JsonTypeInfo(
+ use = JsonTypeInfo.Id.NAME,
+ include = JsonTypeInfo.As.PROPERTY,
+ property = "type")
+@JsonSubTypes({
+ @JsonSubTypes.Type(value = InnerTemporalJoinRelationRelation.class, name = "innerTemporalJoin"),
+ @JsonSubTypes.Type(value = LeftOuterTemporalJoinRelationRelation.class, name = "leftOuterTemporalJoin")
+})
@EqualsAndHashCode(callSuper = true)
@Data
@NoArgsConstructor
-public abstract class TemporalJoin extends JoinRelation {
+public abstract class TemporalJoinRelation extends JoinRelation {
@JsonProperty("systemTime")
private FieldInfo systemTime;
@@ -52,7 +62,7 @@ public abstract class TemporalJoin extends JoinRelation {
* @param systemTime The system time for temporal join
*/
@JsonCreator
- public TemporalJoin(
+ public TemporalJoinRelation(
@JsonProperty("inputs") List<String> inputs,
@JsonProperty("outputs") List<String> outputs,
@JsonProperty("joinConditionMap") Map<String, List<FilterFunction>> joinConditionMap,
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
index 2f453e062..faa7fdf71 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
@@ -48,7 +48,7 @@ import org.apache.inlong.sort.protocol.transformation.Function;
import org.apache.inlong.sort.protocol.transformation.FunctionParam;
import org.apache.inlong.sort.protocol.transformation.relation.JoinRelation;
import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
-import org.apache.inlong.sort.protocol.transformation.relation.TemporalJoin;
+import org.apache.inlong.sort.protocol.transformation.relation.TemporalJoinRelation;
import org.apache.inlong.sort.protocol.transformation.relation.UnionNodeRelation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -425,8 +425,8 @@ public class FlinkSqlParser implements Parser {
.append(tableNameAliasMap.get(relation.getInputs().get(0)));
// Parse condition map of join and format condition to sql, such as on 1 = 1...
Map<String, List<FilterFunction>> conditionMap = relation.getJoinConditionMap();
- if (relation instanceof TemporalJoin) {
- parseTemporalJoin((TemporalJoin) relation, nodeMap, tableNameAliasMap, conditionMap, sb);
+ if (relation instanceof TemporalJoinRelation) {
+ parseTemporalJoin((TemporalJoinRelation) relation, nodeMap, tableNameAliasMap, conditionMap, sb);
} else {
parseRegularJoin(relation, nodeMap, tableNameAliasMap, conditionMap, sb);
}
@@ -454,7 +454,7 @@ public class FlinkSqlParser implements Parser {
}
}
- private void parseTemporalJoin(TemporalJoin relation, Map<String, Node> nodeMap,
+ private void parseTemporalJoin(TemporalJoinRelation relation, Map<String, Node> nodeMap,
Map<String, String> tableNameAliasMap, Map<String, List<FilterFunction>> conditionMap, StringBuilder sb) {
if (StringUtils.isBlank(relation.getSystemTime().getNodeId())) {
relation.getSystemTime().setNodeId(relation.getInputs().get(0));
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MySqlTemporalJoinRelationSqlParseTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MySqlTemporalJoinRelationSqlParseTest.java
new file mode 100644
index 000000000..69453fd95
--- /dev/null
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MySqlTemporalJoinRelationSqlParseTest.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.parser;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.inlong.common.enums.MetaField;
+import org.apache.inlong.sort.formats.common.DecimalFormatInfo;
+import org.apache.inlong.sort.formats.common.LongFormatInfo;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
+import org.apache.inlong.sort.parser.impl.FlinkSqlParser;
+import org.apache.inlong.sort.parser.result.ParseResult;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.GroupInfo;
+import org.apache.inlong.sort.protocol.MetaFieldInfo;
+import org.apache.inlong.sort.protocol.StreamInfo;
+import org.apache.inlong.sort.protocol.enums.KafkaScanStartupMode;
+import org.apache.inlong.sort.protocol.node.Node;
+import org.apache.inlong.sort.protocol.node.extract.KafkaExtractNode;
+import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
+import org.apache.inlong.sort.protocol.node.format.CanalJsonFormat;
+import org.apache.inlong.sort.protocol.node.format.JsonFormat;
+import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+import org.apache.inlong.sort.protocol.transformation.FilterFunction;
+import org.apache.inlong.sort.protocol.transformation.WatermarkField;
+import org.apache.inlong.sort.protocol.transformation.function.SingleValueFilterFunction;
+import org.apache.inlong.sort.protocol.transformation.operator.EmptyOperator;
+import org.apache.inlong.sort.protocol.transformation.operator.EqualOperator;
+import org.apache.inlong.sort.protocol.transformation.relation.InnerTemporalJoinRelationRelation;
+import org.apache.inlong.sort.protocol.transformation.relation.LeftOuterTemporalJoinRelationRelation;
+import org.apache.inlong.sort.protocol.transformation.relation.TemporalJoinRelation;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+/**
+ * Test for temporal join for with {@link FlinkSqlParser} with {@link MySqlExtractNode}
+ */
+public class MySqlTemporalJoinRelationSqlParseTest extends AbstractTestBase {
+
+ private KafkaExtractNode buildKafkaExtractNode() {
+ List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new LongFormatInfo()),
+ new FieldInfo("price", new DecimalFormatInfo(32, 2)),
+ new FieldInfo("currency", new StringFormatInfo()),
+ new FieldInfo("order_time", new TimestampFormatInfo(3)),
+ new MetaFieldInfo("proc_time", MetaField.PROCESS_TIME)
+ );
+ return new KafkaExtractNode("1", "kafka_input", fields,
+ new WatermarkField(new FieldInfo("order_time", new TimestampFormatInfo(3))),
+ null, "orders", "localhost:9092",
+ new JsonFormat(), KafkaScanStartupMode.EARLIEST_OFFSET, null,
+ "groupId", null);
+ }
+
+ private MySqlExtractNode buildMySQLExtractNode() {
+ List<FieldInfo> fields = Arrays.asList(new FieldInfo("currency", new LongFormatInfo()),
+ new FieldInfo("conversion_rate", new DecimalFormatInfo(32, 2)),
+ new FieldInfo("currency", new StringFormatInfo()),
+ new FieldInfo("update_time", new TimestampFormatInfo(3))
+ );
+ Map<String, String> map = new HashMap<>();
+ return new MySqlExtractNode("2", "mysql_input", fields,
+ new WatermarkField(new FieldInfo("update_time", new TimestampFormatInfo(3))), map,
+ "currency", Collections.singletonList("currency_rates"), "localhost",
+ "inlong", "inlong", "inlong", null,
+ null, null, null);
+ }
+
+ private MySqlExtractNode buildMySQLExtractNode2() {
+ List<FieldInfo> fields = Arrays.asList(new FieldInfo("currency", new LongFormatInfo()),
+ new FieldInfo("conversion_rate", new DecimalFormatInfo(32, 2)),
+ new FieldInfo("currency", new StringFormatInfo()),
+ new FieldInfo("update_time", new TimestampFormatInfo(3)),
+ new MetaFieldInfo("proc_time", MetaField.PROCESS_TIME)
+ );
+ Map<String, String> map = new HashMap<>();
+ return new MySqlExtractNode("2", "mysql_input", fields, map, "currency",
+ Collections.singletonList("currency_rates"), "inlong", "inlong",
+ "jdbc:mysql://localhost:3306/inlong");
+ }
+
+ private KafkaLoadNode buildKafkaLoadNode() {
+ List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new LongFormatInfo()),
+ new FieldInfo("price", new DecimalFormatInfo(32, 2)),
+ new FieldInfo("currency", new StringFormatInfo()),
+ new FieldInfo("order_time", new TimestampFormatInfo(3)),
+ new FieldInfo("conversion_rate", new DecimalFormatInfo(32, 2))
+ );
+ List<FieldRelation> relations = Arrays.asList(
+ new FieldRelation(new FieldInfo("id", "1", new LongFormatInfo()),
+ new FieldInfo("id", new LongFormatInfo())),
+ new FieldRelation(new FieldInfo("price", "1", new DecimalFormatInfo(32, 2)),
+ new FieldInfo("price", new DecimalFormatInfo(32, 2))),
+ new FieldRelation(new FieldInfo("currency", "1", new StringFormatInfo()),
+ new FieldInfo("currency", new StringFormatInfo())),
+ new FieldRelation(new FieldInfo("order_time", "1", new TimestampFormatInfo(3)),
+ new FieldInfo("order_time", new TimestampFormatInfo(3))),
+ new FieldRelation(new FieldInfo("conversion_rate", "2", new DecimalFormatInfo(32, 2)),
+ new FieldInfo("conversion_rate", new DecimalFormatInfo(32, 2)))
+ );
+ return new KafkaLoadNode("3", "kafka_output", fields, relations, null,
+ null, "orders_output", "localhost:9092", new CanalJsonFormat(),
+ null, null, null);
+ }
+
+ /**
+ * build node relation
+ *
+ * @param inputs extract node
+ * @param outputs load node
+ * @return node relation
+ */
+ private TemporalJoinRelation buildNodeRelation(List<Node> inputs, List<Node> outputs, boolean left,
+ FieldInfo systemTime) {
+ List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
+ List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
+ Map<String, List<FilterFunction>> conditionMap = new TreeMap<>();
+ conditionMap.put("2", Collections.singletonList(new SingleValueFilterFunction(EmptyOperator.getInstance(),
+ new FieldInfo("currency", "1", new LongFormatInfo()),
+ EqualOperator.getInstance(), new FieldInfo("currency", "2", new StringFormatInfo()))));
+ if (left) {
+ return new LeftOuterTemporalJoinRelationRelation(inputIds, outputIds, conditionMap, systemTime);
+ }
+ return new InnerTemporalJoinRelationRelation(inputIds, outputIds, conditionMap, systemTime);
+ }
+
+ /**
+ * Test inner temporal join with event time for extract is mysql {@link MySqlExtractNode},
+ * {@link KafkaExtractNode} and load is mysql {@link KafkaLoadNode}
+ *
+ * @throws Exception The exception may be thrown when executing
+ */
+ @Test
+ public void testInnerTemporalJoinWithEventTimeParse() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+ env.enableCheckpointing(10000);
+ env.disableOperatorChaining();
+ EnvironmentSettings settings = EnvironmentSettings
+ .newInstance()
+ .useBlinkPlanner()
+ .inStreamingMode()
+ .build();
+ StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
+ Node kafkaExtractNode = buildKafkaExtractNode();
+ Node mySqlExtractNode2 = buildMySQLExtractNode();
+ Node kafkaLoadNode = buildKafkaLoadNode();
+ StreamInfo streamInfo = new StreamInfo("1",
+ Arrays.asList(kafkaExtractNode, mySqlExtractNode2, kafkaLoadNode),
+ Collections.singletonList(
+ buildNodeRelation(Arrays.asList(kafkaExtractNode, mySqlExtractNode2),
+ Collections.singletonList(kafkaLoadNode), false, new FieldInfo("order_time")))
+ );
+ GroupInfo groupInfo = new GroupInfo("1", Collections.singletonList(streamInfo));
+ FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv, groupInfo);
+ ParseResult result = parser.parse();
+ Assert.assertTrue(result.tryExecute());
+ }
+
+ /**
+ * Test left temporal join with event time for extract is mysql {@link MySqlExtractNode},
+ * {@link KafkaExtractNode} and load is mysql {@link KafkaLoadNode}
+ *
+ * @throws Exception The exception may be thrown when executing
+ */
+ @Test
+ public void testLeftTemporalJoinWithEventTimeParse() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+ env.enableCheckpointing(10000);
+ env.disableOperatorChaining();
+ EnvironmentSettings settings = EnvironmentSettings
+ .newInstance()
+ .useBlinkPlanner()
+ .inStreamingMode()
+ .build();
+ StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
+ Node kafkaExtractNode = buildKafkaExtractNode();
+ Node mySqlExtractNode2 = buildMySQLExtractNode();
+ Node kafkaLoadNode = buildKafkaLoadNode();
+ StreamInfo streamInfo = new StreamInfo("1",
+ Arrays.asList(kafkaExtractNode, mySqlExtractNode2, kafkaLoadNode),
+ Collections.singletonList(
+ buildNodeRelation(Arrays.asList(kafkaExtractNode, mySqlExtractNode2),
+ Collections.singletonList(kafkaLoadNode), true, new FieldInfo("order_time")))
+ );
+ GroupInfo groupInfo = new GroupInfo("1", Collections.singletonList(streamInfo));
+ FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv, groupInfo);
+ ParseResult result = parser.parse();
+ Assert.assertTrue(result.tryExecute());
+ }
+
+ /**
+ * Test inner temporal join with process time for extract is mysql {@link MySqlExtractNode},
+ * {@link KafkaExtractNode} and load is mysql {@link KafkaLoadNode}
+ *
+ * @throws Exception The exception may be thrown when executing
+ */
+ @Test
+ public void testInnerTemporalJoinWithProcessTimeParse() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+ env.enableCheckpointing(10000);
+ env.disableOperatorChaining();
+ EnvironmentSettings settings = EnvironmentSettings
+ .newInstance()
+ .useBlinkPlanner()
+ .inStreamingMode()
+ .build();
+ StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
+ Node kafkaExtractNode = buildKafkaExtractNode();
+ Node mySqlExtractNode2 = buildMySQLExtractNode2();
+ Node kafkaLoadNode = buildKafkaLoadNode();
+ StreamInfo streamInfo = new StreamInfo("1",
+ Arrays.asList(kafkaExtractNode, mySqlExtractNode2, kafkaLoadNode),
+ Collections.singletonList(
+ buildNodeRelation(Arrays.asList(kafkaExtractNode, mySqlExtractNode2),
+ Collections.singletonList(kafkaLoadNode), false, new FieldInfo("proc_time")))
+ );
+ GroupInfo groupInfo = new GroupInfo("1", Collections.singletonList(streamInfo));
+ FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv, groupInfo);
+ ParseResult result = parser.parse();
+ Assert.assertTrue(result.tryExecute());
+ }
+
+ /**
+ * Test left temporal join with process time for extract is mysql {@link MySqlExtractNode},
+ * {@link KafkaExtractNode} and load is mysql {@link KafkaLoadNode}
+ *
+ * @throws Exception The exception may be thrown when executing
+ */
+ @Test
+ public void testLeftTemporalJoinWithProcessTimeParse() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+ env.enableCheckpointing(10000);
+ env.disableOperatorChaining();
+ EnvironmentSettings settings = EnvironmentSettings
+ .newInstance()
+ .useBlinkPlanner()
+ .inStreamingMode()
+ .build();
+ StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
+ Node kafkaExtractNode = buildKafkaExtractNode();
+ Node mySqlExtractNode2 = buildMySQLExtractNode2();
+ Node kafkaLoadNode = buildKafkaLoadNode();
+ StreamInfo streamInfo = new StreamInfo("1",
+ Arrays.asList(kafkaExtractNode, mySqlExtractNode2, kafkaLoadNode),
+ Collections.singletonList(
+ buildNodeRelation(Arrays.asList(kafkaExtractNode, mySqlExtractNode2),
+ Collections.singletonList(kafkaLoadNode), true, new FieldInfo("proc_time")))
+ );
+ GroupInfo groupInfo = new GroupInfo("1", Collections.singletonList(streamInfo));
+ FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv, groupInfo);
+ ParseResult result = parser.parse();
+ Assert.assertTrue(result.tryExecute());
+ }
+}
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/RedisTemporalJoinSqlParseTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/RedisTemporalJoinRelationSqlParseTest.java
similarity index 95%
rename from inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/RedisTemporalJoinSqlParseTest.java
rename to inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/RedisTemporalJoinRelationSqlParseTest.java
index 403933159..36b3977e5 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/RedisTemporalJoinSqlParseTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/RedisTemporalJoinRelationSqlParseTest.java
@@ -44,9 +44,9 @@ import org.apache.inlong.sort.protocol.transformation.FilterFunction;
import org.apache.inlong.sort.protocol.transformation.function.SingleValueFilterFunction;
import org.apache.inlong.sort.protocol.transformation.operator.EmptyOperator;
import org.apache.inlong.sort.protocol.transformation.operator.EqualOperator;
-import org.apache.inlong.sort.protocol.transformation.relation.InnerTemporalJoin;
-import org.apache.inlong.sort.protocol.transformation.relation.LeftOuterTemporalJoin;
-import org.apache.inlong.sort.protocol.transformation.relation.TemporalJoin;
+import org.apache.inlong.sort.protocol.transformation.relation.InnerTemporalJoinRelationRelation;
+import org.apache.inlong.sort.protocol.transformation.relation.LeftOuterTemporalJoinRelationRelation;
+import org.apache.inlong.sort.protocol.transformation.relation.TemporalJoinRelation;
import org.junit.Assert;
import org.junit.Test;
@@ -61,7 +61,7 @@ import java.util.stream.Collectors;
/**
* Test for {@link RedisExtractNode} and temporal join {@link FlinkSqlParser}
*/
-public class RedisTemporalJoinSqlParseTest extends AbstractTestBase {
+public class RedisTemporalJoinRelationSqlParseTest extends AbstractTestBase {
private MySqlExtractNode buildMySQLExtractNode() {
List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new StringFormatInfo()),
@@ -127,7 +127,7 @@ public class RedisTemporalJoinSqlParseTest extends AbstractTestBase {
* @param outputs load node
* @return node relation
*/
- private TemporalJoin buildNodeRelation(List<Node> inputs, List<Node> outputs, boolean left) {
+ private TemporalJoinRelation buildNodeRelation(List<Node> inputs, List<Node> outputs, boolean left) {
List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
Map<String, List<FilterFunction>> conditionMap = new TreeMap<>();
@@ -144,9 +144,10 @@ public class RedisTemporalJoinSqlParseTest extends AbstractTestBase {
new FieldInfo("id", "1", new LongFormatInfo()),
EqualOperator.getInstance(), new FieldInfo("k", "5", new StringFormatInfo()))));
if (left) {
- return new LeftOuterTemporalJoin(inputIds, outputIds, conditionMap, new FieldInfo("proc_time"));
+ return new LeftOuterTemporalJoinRelationRelation(inputIds, outputIds, conditionMap,
+ new FieldInfo("proc_time"));
}
- return new InnerTemporalJoin(inputIds, outputIds, conditionMap, new FieldInfo("proc_time"));
+ return new InnerTemporalJoinRelationRelation(inputIds, outputIds, conditionMap, new FieldInfo("proc_time"));
}
/**