You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/08/19 08:42:47 UTC

[inlong] branch master updated: [INLONG-5599][Sort] Add temporal join support for MySQL (#5600)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 5cfd05b53 [INLONG-5599][Sort] Add temporal join support for MySQL (#5600)
5cfd05b53 is described below

commit 5cfd05b5393de6789c6e7ebb3e3a27fdab5381ba
Author: Charles <44...@users.noreply.github.com>
AuthorDate: Fri Aug 19 16:42:42 2022 +0800

    [INLONG-5599][Sort] Add temporal join support for MySQL (#5600)
---
 .../protocol/node/extract/MySqlExtractNode.java    | 164 ++++++++++--
 .../protocol/transformation/WatermarkField.java    |  16 +-
 ...java => InnerTemporalJoinRelationRelation.java} |   8 +-
 .../transformation/relation/JoinRelation.java      |   4 +-
 ... => LeftOuterTemporalJoinRelationRelation.java} |   4 +-
 .../transformation/relation/NodeRelation.java      |   4 +-
 ...TemporalJoin.java => TemporalJoinRelation.java} |  16 +-
 .../inlong/sort/parser/impl/FlinkSqlParser.java    |   8 +-
 .../MySqlTemporalJoinRelationSqlParseTest.java     | 285 +++++++++++++++++++++
 ... => RedisTemporalJoinRelationSqlParseTest.java} |  15 +-
 10 files changed, 471 insertions(+), 53 deletions(-)

diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/MySqlExtractNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/MySqlExtractNode.java
index 750b9b6e6..b1d0f2783 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/MySqlExtractNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/MySqlExtractNode.java
@@ -30,6 +30,7 @@ import org.apache.inlong.common.enums.MetaField;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.InlongMetric;
 import org.apache.inlong.sort.protocol.Metadata;
+import org.apache.inlong.sort.protocol.enums.ExtractMode;
 import org.apache.inlong.sort.protocol.node.ExtractNode;
 import org.apache.inlong.sort.protocol.transformation.WatermarkField;
 
@@ -77,35 +78,137 @@ public class MySqlExtractNode extends ExtractNode implements Metadata, InlongMet
     @JsonInclude(Include.NON_NULL)
     @JsonProperty("serverTimeZone")
     private String serverTimeZone;
+    @Nonnull
+    @JsonProperty("extractMode")
+    private ExtractMode extractMode;
+    @JsonProperty("url")
+    private String url;
+
+    /**
+     * Constructor only used for {@link ExtractMode#CDC}
+     *
+     * @param id The id of node
+     * @param name The name of node
+     * @param fields The field list of node
+     * @param watermarkField The watermark field of node only used for {@link ExtractMode#CDC}
+     * @param properties The properties connect to mysql
+     * @param primaryKey The primark key connect to mysql
+     * @param tableNames The table names connect to mysql
+     * @param hostname The hostname connect to mysql only used for {@link ExtractMode#CDC}
+     * @param username The username connect to mysql
+     * @param password The password connect to mysql
+     * @param database The database connect to mysql only used for {@link ExtractMode#CDC}
+     * @param port The port connect to mysql only used for {@link ExtractMode#CDC}
+     * @param serverId The server id connect to mysql only used for {@link ExtractMode#CDC}
+     * @param incrementalSnapshotEnabled The incremental snapshot enabled connect to mysql
+     *         only used for {@link ExtractMode#CDC}
+     * @param serverTimeZone The server time zone connect to mysql only used for {@link ExtractMode#CDC}
+     */
+    public MySqlExtractNode(@Nonnull @JsonProperty("id") String id,
+            @Nonnull @JsonProperty("name") String name,
+            @Nonnull @JsonProperty("fields") List<FieldInfo> fields,
+            @Nullable @JsonProperty("watermarkField") WatermarkField watermarkField,
+            @Nullable @JsonProperty("properties") Map<String, String> properties,
+            @Nullable @JsonProperty("primaryKey") String primaryKey,
+            @Nonnull @JsonProperty("tableNames") List<String> tableNames,
+            @Nonnull @JsonProperty("hostname") String hostname,
+            @Nonnull @JsonProperty("username") String username,
+            @Nonnull @JsonProperty("password") String password,
+            @Nonnull @JsonProperty("database") String database,
+            @Nullable @JsonProperty("port") Integer port,
+            @Nullable @JsonProperty("serverId") Integer serverId,
+            @Nullable @JsonProperty("incrementalSnapshotEnabled") Boolean incrementalSnapshotEnabled,
+            @Nullable @JsonProperty("serverTimeZone") String serverTimeZone) {
+        this(id, name, fields, watermarkField, properties, primaryKey, tableNames, hostname, username, password,
+                database, port, serverId, incrementalSnapshotEnabled, serverTimeZone, ExtractMode.CDC, null);
+    }
 
+    /**
+     * Constructor only used for {@link ExtractMode#SCAN}
+     *
+     * @param id The id of node
+     * @param name The name of node
+     * @param fields The field list of node
+     * @param properties The properties connect to mysql
+     * @param primaryKey The primark key connect to mysql
+     * @param tableNames The table names connect to mysql
+     * @param username The username connect to mysql
+     * @param password The password connect to mysql
+     * @param url The url connect to mysql only used for {@link ExtractMode#SCAN}
+     */
+    public MySqlExtractNode(@JsonProperty("id") String id,
+            @JsonProperty("name") String name,
+            @JsonProperty("fields") List<FieldInfo> fields,
+            @Nullable @JsonProperty("properties") Map<String, String> properties,
+            @Nullable @JsonProperty("primaryKey") String primaryKey,
+            @JsonProperty("tableNames") @Nonnull List<String> tableNames,
+            @JsonProperty("username") String username,
+            @JsonProperty("password") String password,
+            @Nullable @JsonProperty("url") String url) {
+        this(id, name, fields, null, properties, primaryKey, tableNames, null, username,
+                password, null, null, null, null, null,
+                ExtractMode.SCAN, url);
+    }
+
+    /**
+     * Base constructor
+     *
+     * @param id The id of node
+     * @param name The name of node
+     * @param fields The field list of node
+     * @param watermarkField The watermark field of node only used for {@link ExtractMode#CDC}
+     * @param properties The properties connect to mysql
+     * @param primaryKey The primark key connect to mysql
+     * @param tableNames The table names connect to mysql
+     * @param hostname The hostname connect to mysql only used for {@link ExtractMode#CDC}
+     * @param username The username connect to mysql
+     * @param password The password connect to mysql
+     * @param database The database connect to mysql only used for {@link ExtractMode#CDC}
+     * @param port The port connect to mysql only used for {@link ExtractMode#CDC}
+     * @param serverId The server id connect to mysql only used for {@link ExtractMode#CDC}
+     * @param incrementalSnapshotEnabled The incremental snapshot enabled connect to mysql
+     *         only used for {@link ExtractMode#CDC}
+     * @param serverTimeZone The server time zone connect to mysql only used for {@link ExtractMode#CDC}
+     * @param extractMode The extract mode connect mysql {@link ExtractMode}
+     * @param url The url connect to mysql only used for {@link ExtractMode#SCAN}
+     */
     @JsonCreator
     public MySqlExtractNode(@JsonProperty("id") String id,
             @JsonProperty("name") String name,
             @JsonProperty("fields") List<FieldInfo> fields,
-            @Nullable @JsonProperty("watermarkField") WatermarkField waterMarkField,
-            @JsonProperty("properties") Map<String, String> properties,
-            @JsonProperty("primaryKey") String primaryKey,
+            @Nullable @JsonProperty("watermarkField") WatermarkField watermarkField,
+            @Nullable @JsonProperty("properties") Map<String, String> properties,
+            @Nullable @JsonProperty("primaryKey") String primaryKey,
             @JsonProperty("tableNames") @Nonnull List<String> tableNames,
-            @JsonProperty("hostname") String hostname,
+            @Nullable @JsonProperty("hostname") String hostname,
             @JsonProperty("username") String username,
             @JsonProperty("password") String password,
-            @JsonProperty("database") String database,
-            @JsonProperty("port") Integer port,
-            @JsonProperty("serverId") Integer serverId,
-            @JsonProperty("incrementalSnapshotEnabled") Boolean incrementalSnapshotEnabled,
-            @JsonProperty("serverTimeZone") String serverTimeZone) {
-        super(id, name, fields, waterMarkField, properties);
+            @Nullable @JsonProperty("database") String database,
+            @Nullable @JsonProperty("port") Integer port,
+            @Nullable @JsonProperty("serverId") Integer serverId,
+            @Nullable @JsonProperty("incrementalSnapshotEnabled") Boolean incrementalSnapshotEnabled,
+            @Nullable @JsonProperty("serverTimeZone") String serverTimeZone,
+            @Nonnull @JsonProperty("extractMode") ExtractMode extractMode,
+            @Nullable @JsonProperty("url") String url) {
+        super(id, name, fields, watermarkField, properties);
         this.tableNames = Preconditions.checkNotNull(tableNames, "tableNames is null");
         Preconditions.checkState(!tableNames.isEmpty(), "tableNames is empty");
-        this.hostname = Preconditions.checkNotNull(hostname, "hostname is null");
+        if (extractMode == ExtractMode.CDC) {
+            this.hostname = Preconditions.checkNotNull(hostname, "hostname is null");
+            this.database = Preconditions.checkNotNull(database, "database is null");
+        } else {
+            this.hostname = hostname;
+            this.database = database;
+            this.url = Preconditions.checkNotNull(url, "url is null");
+        }
         this.username = Preconditions.checkNotNull(username, "username is null");
         this.password = Preconditions.checkNotNull(password, "password is null");
-        this.database = Preconditions.checkNotNull(database, "database is null");
         this.primaryKey = primaryKey;
         this.port = port;
         this.serverId = serverId;
         this.incrementalSnapshotEnabled = incrementalSnapshotEnabled;
         this.serverTimeZone = serverTimeZone;
+        this.extractMode = extractMode;
     }
 
     @Override
@@ -121,23 +224,30 @@ public class MySqlExtractNode extends ExtractNode implements Metadata, InlongMet
     @Override
     public Map<String, String> tableOptions() {
         Map<String, String> options = super.tableOptions();
-        options.put("connector", "mysql-cdc-inlong");
-        options.put("hostname", hostname);
+        if (extractMode == ExtractMode.CDC) {
+            options.put("connector", "mysql-cdc-inlong");
+            options.put("hostname", hostname);
+            options.put("database-name", database);
+            if (port != null) {
+                options.put("port", port.toString());
+            }
+            if (serverId != null) {
+                options.put("server-id", serverId.toString());
+            }
+            if (incrementalSnapshotEnabled != null) {
+                options.put("scan.incremental.snapshot.enabled", incrementalSnapshotEnabled.toString());
+            }
+            if (serverTimeZone != null) {
+                options.put("server-time-zone", serverTimeZone);
+            }
+        } else {
+            options.put("connector", "jdbc-inlong");
+            options.put("url", url);
+            Preconditions.checkState(tableNames.size() == 1,
+                    "Only support one table for scan extract mode");
+        }
         options.put("username", username);
         options.put("password", password);
-        options.put("database-name", database);
-        if (port != null) {
-            options.put("port", port.toString());
-        }
-        if (serverId != null) {
-            options.put("server-id", serverId.toString());
-        }
-        if (incrementalSnapshotEnabled != null) {
-            options.put("scan.incremental.snapshot.enabled", incrementalSnapshotEnabled.toString());
-        }
-        if (serverTimeZone != null) {
-            options.put("server-time-zone", serverTimeZone);
-        }
         String formatTable = tableNames.size() == 1 ? tableNames.get(0) :
                 String.format("(%s)", StringUtils.join(tableNames, "|"));
         options.put("table-name", String.format("%s", formatTable));
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/WatermarkField.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/WatermarkField.java
index a36ed79bc..d8949c140 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/WatermarkField.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/WatermarkField.java
@@ -24,6 +24,7 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCre
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
 import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam.TimeUnit;
 
 import java.util.Arrays;
 import java.util.List;
@@ -48,12 +49,23 @@ public class WatermarkField implements TimeWindowFunction {
             @JsonProperty("interval") StringConstantParam interval,
             @JsonProperty("timeUnit") TimeUnitConstantParam timeUnit) {
         this.timeAttr = Preconditions.checkNotNull(timeAttr, "timeAttr is null");
-        this.interval = Preconditions.checkNotNull(interval, "interval is null");
-        this.timeUnit = Preconditions.checkNotNull(timeUnit, "timeUnit is null");
+        this.interval = interval;
+        this.timeUnit = timeUnit;
+    }
+
+    public WatermarkField(@JsonProperty("timeAttr") FieldInfo timeAttr) {
+        this(timeAttr, null, null);
     }
 
     @Override
     public String format() {
+        if (interval == null) {
+            return String.format("%s FOR %s AS %s", getName(), timeAttr.format(), timeAttr.format());
+        }
+        if (timeUnit == null) {
+            return String.format("%s FOR %s AS %s - INTERVAL %s %s", getName(), timeAttr.format(),
+                    timeAttr.format(), interval.format(), TimeUnit.SECOND.name());
+        }
         return String.format("%s FOR %s AS %s - INTERVAL %s %s", getName(), timeAttr.format(),
                 timeAttr.format(), interval.format(), timeUnit.format());
     }
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/InnerTemporalJoin.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/InnerTemporalJoinRelationRelation.java
similarity index 93%
rename from inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/InnerTemporalJoin.java
rename to inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/InnerTemporalJoinRelationRelation.java
index 9cd4fafa0..2b9b5782d 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/InnerTemporalJoin.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/InnerTemporalJoinRelationRelation.java
@@ -31,16 +31,16 @@ import java.util.List;
 import java.util.Map;
 
 /**
- * Inner temporal join
+ * Inner temporal join relation
  */
 @JsonTypeName("innerTemporalJoin")
 @EqualsAndHashCode(callSuper = true)
 @Data
 @NoArgsConstructor
-public class InnerTemporalJoin extends TemporalJoin {
+public class InnerTemporalJoinRelationRelation extends TemporalJoinRelation {
 
     /**
-     * LeftOuterTemporalJoin Constructor
+     * Constructor
      *
      * @param inputs The inputs is a list of input node id
      * @param outputs The outputs is a list of output node id
@@ -50,7 +50,7 @@ public class InnerTemporalJoin extends TemporalJoin {
      * @param systemTime The system time for temporal join
      */
     @JsonCreator
-    public InnerTemporalJoin(
+    public InnerTemporalJoinRelationRelation(
             @JsonProperty("inputs") List<String> inputs,
             @JsonProperty("outputs") List<String> outputs,
             @JsonProperty("joinConditionMap") Map<String, List<FilterFunction>> joinConditionMap,
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/JoinRelation.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/JoinRelation.java
index 9fd7687f1..c98e2be30 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/JoinRelation.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/JoinRelation.java
@@ -41,8 +41,8 @@ import java.util.Map;
         @JsonSubTypes.Type(value = InnerJoinNodeRelation.class, name = "innerJoin"),
         @JsonSubTypes.Type(value = LeftOuterJoinNodeRelation.class, name = "leftOuterJoin"),
         @JsonSubTypes.Type(value = RightOuterJoinNodeRelation.class, name = "rightOutJoin"),
-        @JsonSubTypes.Type(value = InnerTemporalJoin.class, name = "innerTemporalJoin"),
-        @JsonSubTypes.Type(value = LeftOuterTemporalJoin.class, name = "leftOuterTemporalJoin")
+        @JsonSubTypes.Type(value = InnerTemporalJoinRelationRelation.class, name = "innerTemporalJoin"),
+        @JsonSubTypes.Type(value = LeftOuterTemporalJoinRelationRelation.class, name = "leftOuterTemporalJoin")
 })
 @EqualsAndHashCode(callSuper = true)
 @Data
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/LeftOuterTemporalJoin.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/LeftOuterTemporalJoinRelationRelation.java
similarity index 94%
rename from inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/LeftOuterTemporalJoin.java
rename to inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/LeftOuterTemporalJoinRelationRelation.java
index fcf4893f9..2a873381e 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/LeftOuterTemporalJoin.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/LeftOuterTemporalJoinRelationRelation.java
@@ -37,7 +37,7 @@ import java.util.Map;
 @EqualsAndHashCode(callSuper = true)
 @Data
 @NoArgsConstructor
-public class LeftOuterTemporalJoin extends TemporalJoin {
+public class LeftOuterTemporalJoinRelationRelation extends TemporalJoinRelation {
 
     /**
      * LeftOuterTemporalJoin Constructor
@@ -50,7 +50,7 @@ public class LeftOuterTemporalJoin extends TemporalJoin {
      * @param systemTime The system time for temporal join
      */
     @JsonCreator
-    public LeftOuterTemporalJoin(
+    public LeftOuterTemporalJoinRelationRelation(
             @JsonProperty("inputs") List<String> inputs,
             @JsonProperty("outputs") List<String> outputs,
             @JsonProperty("joinConditionMap") Map<String, List<FilterFunction>> joinConditionMap,
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/NodeRelation.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/NodeRelation.java
index 5e3e6c0eb..43a3e725c 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/NodeRelation.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/NodeRelation.java
@@ -40,8 +40,8 @@ import java.util.List;
         @JsonSubTypes.Type(value = InnerJoinNodeRelation.class, name = "innerJoin"),
         @JsonSubTypes.Type(value = LeftOuterJoinNodeRelation.class, name = "leftOuterJoin"),
         @JsonSubTypes.Type(value = RightOuterJoinNodeRelation.class, name = "rightOutJoin"),
-        @JsonSubTypes.Type(value = InnerTemporalJoin.class, name = "innerTemporalJoin"),
-        @JsonSubTypes.Type(value = LeftOuterTemporalJoin.class, name = "leftOuterTemporalJoin"),
+        @JsonSubTypes.Type(value = InnerTemporalJoinRelationRelation.class, name = "innerTemporalJoin"),
+        @JsonSubTypes.Type(value = LeftOuterTemporalJoinRelationRelation.class, name = "leftOuterTemporalJoin"),
         @JsonSubTypes.Type(value = UnionNodeRelation.class, name = "union"),
         @JsonSubTypes.Type(value = NodeRelation.class, name = "baseRelation")
 })
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/TemporalJoin.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/TemporalJoinRelation.java
similarity index 78%
rename from inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/TemporalJoin.java
rename to inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/TemporalJoinRelation.java
index c9c7a71d2..6a4fc1725 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/TemporalJoin.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/TemporalJoinRelation.java
@@ -23,6 +23,8 @@ import lombok.EqualsAndHashCode;
 import lombok.NoArgsConstructor;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonSubTypes;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.transformation.FilterFunction;
 
@@ -31,12 +33,20 @@ import java.util.List;
 import java.util.Map;
 
 /**
- * Temporal join base class
+ * Temporal join relation base class
  */
+@JsonTypeInfo(
+        use = JsonTypeInfo.Id.NAME,
+        include = JsonTypeInfo.As.PROPERTY,
+        property = "type")
+@JsonSubTypes({
+        @JsonSubTypes.Type(value = InnerTemporalJoinRelationRelation.class, name = "innerTemporalJoin"),
+        @JsonSubTypes.Type(value = LeftOuterTemporalJoinRelationRelation.class, name = "leftOuterTemporalJoin")
+})
 @EqualsAndHashCode(callSuper = true)
 @Data
 @NoArgsConstructor
-public abstract class TemporalJoin extends JoinRelation {
+public abstract class TemporalJoinRelation extends JoinRelation {
 
     @JsonProperty("systemTime")
     private FieldInfo systemTime;
@@ -52,7 +62,7 @@ public abstract class TemporalJoin extends JoinRelation {
      * @param systemTime The system time for temporal join
      */
     @JsonCreator
-    public TemporalJoin(
+    public TemporalJoinRelation(
             @JsonProperty("inputs") List<String> inputs,
             @JsonProperty("outputs") List<String> outputs,
             @JsonProperty("joinConditionMap") Map<String, List<FilterFunction>> joinConditionMap,
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
index 2f453e062..faa7fdf71 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
@@ -48,7 +48,7 @@ import org.apache.inlong.sort.protocol.transformation.Function;
 import org.apache.inlong.sort.protocol.transformation.FunctionParam;
 import org.apache.inlong.sort.protocol.transformation.relation.JoinRelation;
 import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
-import org.apache.inlong.sort.protocol.transformation.relation.TemporalJoin;
+import org.apache.inlong.sort.protocol.transformation.relation.TemporalJoinRelation;
 import org.apache.inlong.sort.protocol.transformation.relation.UnionNodeRelation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -425,8 +425,8 @@ public class FlinkSqlParser implements Parser {
                 .append(tableNameAliasMap.get(relation.getInputs().get(0)));
         // Parse condition map of join and format condition to sql, such as on 1 = 1...
         Map<String, List<FilterFunction>> conditionMap = relation.getJoinConditionMap();
-        if (relation instanceof TemporalJoin) {
-            parseTemporalJoin((TemporalJoin) relation, nodeMap, tableNameAliasMap, conditionMap, sb);
+        if (relation instanceof TemporalJoinRelation) {
+            parseTemporalJoin((TemporalJoinRelation) relation, nodeMap, tableNameAliasMap, conditionMap, sb);
         } else {
             parseRegularJoin(relation, nodeMap, tableNameAliasMap, conditionMap, sb);
         }
@@ -454,7 +454,7 @@ public class FlinkSqlParser implements Parser {
         }
     }
 
-    private void parseTemporalJoin(TemporalJoin relation, Map<String, Node> nodeMap,
+    private void parseTemporalJoin(TemporalJoinRelation relation, Map<String, Node> nodeMap,
             Map<String, String> tableNameAliasMap, Map<String, List<FilterFunction>> conditionMap, StringBuilder sb) {
         if (StringUtils.isBlank(relation.getSystemTime().getNodeId())) {
             relation.getSystemTime().setNodeId(relation.getInputs().get(0));
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MySqlTemporalJoinRelationSqlParseTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MySqlTemporalJoinRelationSqlParseTest.java
new file mode 100644
index 000000000..69453fd95
--- /dev/null
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MySqlTemporalJoinRelationSqlParseTest.java
@@ -0,0 +1,285 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.inlong.sort.parser;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.inlong.common.enums.MetaField;
+import org.apache.inlong.sort.formats.common.DecimalFormatInfo;
+import org.apache.inlong.sort.formats.common.LongFormatInfo;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
+import org.apache.inlong.sort.parser.impl.FlinkSqlParser;
+import org.apache.inlong.sort.parser.result.ParseResult;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.GroupInfo;
+import org.apache.inlong.sort.protocol.MetaFieldInfo;
+import org.apache.inlong.sort.protocol.StreamInfo;
+import org.apache.inlong.sort.protocol.enums.KafkaScanStartupMode;
+import org.apache.inlong.sort.protocol.node.Node;
+import org.apache.inlong.sort.protocol.node.extract.KafkaExtractNode;
+import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
+import org.apache.inlong.sort.protocol.node.format.CanalJsonFormat;
+import org.apache.inlong.sort.protocol.node.format.JsonFormat;
+import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+import org.apache.inlong.sort.protocol.transformation.FilterFunction;
+import org.apache.inlong.sort.protocol.transformation.WatermarkField;
+import org.apache.inlong.sort.protocol.transformation.function.SingleValueFilterFunction;
+import org.apache.inlong.sort.protocol.transformation.operator.EmptyOperator;
+import org.apache.inlong.sort.protocol.transformation.operator.EqualOperator;
+import org.apache.inlong.sort.protocol.transformation.relation.InnerTemporalJoinRelationRelation;
+import org.apache.inlong.sort.protocol.transformation.relation.LeftOuterTemporalJoinRelationRelation;
+import org.apache.inlong.sort.protocol.transformation.relation.TemporalJoinRelation;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+/**
+ * Test for temporal join for with {@link FlinkSqlParser} with {@link MySqlExtractNode}
+ */
+public class MySqlTemporalJoinRelationSqlParseTest extends AbstractTestBase {
+
+    private KafkaExtractNode buildKafkaExtractNode() {
+        List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new LongFormatInfo()),
+                new FieldInfo("price", new DecimalFormatInfo(32, 2)),
+                new FieldInfo("currency", new StringFormatInfo()),
+                new FieldInfo("order_time", new TimestampFormatInfo(3)),
+                new MetaFieldInfo("proc_time", MetaField.PROCESS_TIME)
+        );
+        return new KafkaExtractNode("1", "kafka_input", fields,
+                new WatermarkField(new FieldInfo("order_time", new TimestampFormatInfo(3))),
+                null, "orders", "localhost:9092",
+                new JsonFormat(), KafkaScanStartupMode.EARLIEST_OFFSET, null,
+                "groupId", null);
+    }
+
+    private MySqlExtractNode buildMySQLExtractNode() {
+        List<FieldInfo> fields = Arrays.asList(new FieldInfo("currency", new LongFormatInfo()),
+                new FieldInfo("conversion_rate", new DecimalFormatInfo(32, 2)),
+                new FieldInfo("currency", new StringFormatInfo()),
+                new FieldInfo("update_time", new TimestampFormatInfo(3))
+        );
+        Map<String, String> map = new HashMap<>();
+        return new MySqlExtractNode("2", "mysql_input", fields,
+                new WatermarkField(new FieldInfo("update_time", new TimestampFormatInfo(3))), map,
+                "currency", Collections.singletonList("currency_rates"), "localhost",
+                "inlong", "inlong", "inlong", null,
+                null, null, null);
+    }
+
+    private MySqlExtractNode buildMySQLExtractNode2() {
+        List<FieldInfo> fields = Arrays.asList(new FieldInfo("currency", new LongFormatInfo()),
+                new FieldInfo("conversion_rate", new DecimalFormatInfo(32, 2)),
+                new FieldInfo("currency", new StringFormatInfo()),
+                new FieldInfo("update_time", new TimestampFormatInfo(3)),
+                new MetaFieldInfo("proc_time", MetaField.PROCESS_TIME)
+        );
+        Map<String, String> map = new HashMap<>();
+        return new MySqlExtractNode("2", "mysql_input", fields, map, "currency",
+                Collections.singletonList("currency_rates"), "inlong", "inlong",
+                "jdbc:mysql://localhost:3306/inlong");
+    }
+
+    private KafkaLoadNode buildKafkaLoadNode() {
+        List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new LongFormatInfo()),
+                new FieldInfo("price", new DecimalFormatInfo(32, 2)),
+                new FieldInfo("currency", new StringFormatInfo()),
+                new FieldInfo("order_time", new TimestampFormatInfo(3)),
+                new FieldInfo("conversion_rate", new DecimalFormatInfo(32, 2))
+        );
+        List<FieldRelation> relations = Arrays.asList(
+                new FieldRelation(new FieldInfo("id", "1", new LongFormatInfo()),
+                        new FieldInfo("id", new LongFormatInfo())),
+                new FieldRelation(new FieldInfo("price", "1", new DecimalFormatInfo(32, 2)),
+                        new FieldInfo("price", new DecimalFormatInfo(32, 2))),
+                new FieldRelation(new FieldInfo("currency", "1", new StringFormatInfo()),
+                        new FieldInfo("currency", new StringFormatInfo())),
+                new FieldRelation(new FieldInfo("order_time", "1", new TimestampFormatInfo(3)),
+                        new FieldInfo("order_time", new TimestampFormatInfo(3))),
+                new FieldRelation(new FieldInfo("conversion_rate", "2", new DecimalFormatInfo(32, 2)),
+                        new FieldInfo("conversion_rate", new DecimalFormatInfo(32, 2)))
+        );
+        return new KafkaLoadNode("3", "kafka_output", fields, relations, null,
+                null, "orders_output", "localhost:9092", new CanalJsonFormat(),
+                null, null, null);
+    }
+
+    /**
+     * build node relation
+     *
+     * @param inputs extract node
+     * @param outputs load node
+     * @return node relation
+     */
+    private TemporalJoinRelation buildNodeRelation(List<Node> inputs, List<Node> outputs, boolean left,
+            FieldInfo systemTime) {
+        List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
+        List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
+        Map<String, List<FilterFunction>> conditionMap = new TreeMap<>();
+        conditionMap.put("2", Collections.singletonList(new SingleValueFilterFunction(EmptyOperator.getInstance(),
+                new FieldInfo("currency", "1", new LongFormatInfo()),
+                EqualOperator.getInstance(), new FieldInfo("currency", "2", new StringFormatInfo()))));
+        if (left) {
+            return new LeftOuterTemporalJoinRelationRelation(inputIds, outputIds, conditionMap, systemTime);
+        }
+        return new InnerTemporalJoinRelationRelation(inputIds, outputIds, conditionMap, systemTime);
+    }
+
+    /**
+     * Test inner temporal join with event time for extract is mysql {@link MySqlExtractNode},
+     * {@link KafkaExtractNode} and load is mysql {@link KafkaLoadNode}
+     *
+     * @throws Exception The exception may be thrown when executing
+     */
+    @Test
+    public void testInnerTemporalJoinWithEventTimeParse() throws Exception {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.enableCheckpointing(10000);
+        env.disableOperatorChaining();
+        EnvironmentSettings settings = EnvironmentSettings
+                .newInstance()
+                .useBlinkPlanner()
+                .inStreamingMode()
+                .build();
+        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
+        Node kafkaExtractNode = buildKafkaExtractNode();
+        Node mySqlExtractNode2 = buildMySQLExtractNode();
+        Node kafkaLoadNode = buildKafkaLoadNode();
+        StreamInfo streamInfo = new StreamInfo("1",
+                Arrays.asList(kafkaExtractNode, mySqlExtractNode2, kafkaLoadNode),
+                Collections.singletonList(
+                        buildNodeRelation(Arrays.asList(kafkaExtractNode, mySqlExtractNode2),
+                                Collections.singletonList(kafkaLoadNode), false, new FieldInfo("order_time")))
+        );
+        GroupInfo groupInfo = new GroupInfo("1", Collections.singletonList(streamInfo));
+        FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv, groupInfo);
+        ParseResult result = parser.parse();
+        Assert.assertTrue(result.tryExecute());
+    }
+
+    /**
+     * Test left temporal join with event time for extract is mysql {@link MySqlExtractNode},
+     * {@link KafkaExtractNode} and load is mysql {@link KafkaLoadNode}
+     *
+     * @throws Exception The exception may be thrown when executing
+     */
+    @Test
+    public void testLeftTemporalJoinWithEventTimeParse() throws Exception {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.enableCheckpointing(10000);
+        env.disableOperatorChaining();
+        EnvironmentSettings settings = EnvironmentSettings
+                .newInstance()
+                .useBlinkPlanner()
+                .inStreamingMode()
+                .build();
+        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
+        Node kafkaExtractNode = buildKafkaExtractNode();
+        Node mySqlExtractNode2 = buildMySQLExtractNode();
+        Node kafkaLoadNode = buildKafkaLoadNode();
+        StreamInfo streamInfo = new StreamInfo("1",
+                Arrays.asList(kafkaExtractNode, mySqlExtractNode2, kafkaLoadNode),
+                Collections.singletonList(
+                        buildNodeRelation(Arrays.asList(kafkaExtractNode, mySqlExtractNode2),
+                                Collections.singletonList(kafkaLoadNode), true, new FieldInfo("order_time")))
+        );
+        GroupInfo groupInfo = new GroupInfo("1", Collections.singletonList(streamInfo));
+        FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv, groupInfo);
+        ParseResult result = parser.parse();
+        Assert.assertTrue(result.tryExecute());
+    }
+
+    /**
+     * Test inner temporal join with process time for extract is mysql {@link MySqlExtractNode},
+     * {@link KafkaExtractNode} and load is mysql {@link KafkaLoadNode}
+     *
+     * @throws Exception The exception may be thrown when executing
+     */
+    @Test
+    public void testInnerTemporalJoinWithProcessTimeParse() throws Exception {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.enableCheckpointing(10000);
+        env.disableOperatorChaining();
+        EnvironmentSettings settings = EnvironmentSettings
+                .newInstance()
+                .useBlinkPlanner()
+                .inStreamingMode()
+                .build();
+        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
+        Node kafkaExtractNode = buildKafkaExtractNode();
+        Node mySqlExtractNode2 = buildMySQLExtractNode2();
+        Node kafkaLoadNode = buildKafkaLoadNode();
+        StreamInfo streamInfo = new StreamInfo("1",
+                Arrays.asList(kafkaExtractNode, mySqlExtractNode2, kafkaLoadNode),
+                Collections.singletonList(
+                        buildNodeRelation(Arrays.asList(kafkaExtractNode, mySqlExtractNode2),
+                                Collections.singletonList(kafkaLoadNode), false, new FieldInfo("proc_time")))
+        );
+        GroupInfo groupInfo = new GroupInfo("1", Collections.singletonList(streamInfo));
+        FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv, groupInfo);
+        ParseResult result = parser.parse();
+        Assert.assertTrue(result.tryExecute());
+    }
+
+    /**
+     * Test left temporal join with process time for extract is mysql {@link MySqlExtractNode},
+     * {@link KafkaExtractNode} and load is mysql {@link KafkaLoadNode}
+     *
+     * @throws Exception The exception may be thrown when executing
+     */
+    @Test
+    public void testLeftTemporalJoinWithProcessTimeParse() throws Exception {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.enableCheckpointing(10000);
+        env.disableOperatorChaining();
+        EnvironmentSettings settings = EnvironmentSettings
+                .newInstance()
+                .useBlinkPlanner()
+                .inStreamingMode()
+                .build();
+        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
+        Node kafkaExtractNode = buildKafkaExtractNode();
+        Node mySqlExtractNode2 = buildMySQLExtractNode2();
+        Node kafkaLoadNode = buildKafkaLoadNode();
+        StreamInfo streamInfo = new StreamInfo("1",
+                Arrays.asList(kafkaExtractNode, mySqlExtractNode2, kafkaLoadNode),
+                Collections.singletonList(
+                        buildNodeRelation(Arrays.asList(kafkaExtractNode, mySqlExtractNode2),
+                                Collections.singletonList(kafkaLoadNode), true, new FieldInfo("proc_time")))
+        );
+        GroupInfo groupInfo = new GroupInfo("1", Collections.singletonList(streamInfo));
+        FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv, groupInfo);
+        ParseResult result = parser.parse();
+        Assert.assertTrue(result.tryExecute());
+    }
+}
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/RedisTemporalJoinSqlParseTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/RedisTemporalJoinRelationSqlParseTest.java
similarity index 95%
rename from inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/RedisTemporalJoinSqlParseTest.java
rename to inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/RedisTemporalJoinRelationSqlParseTest.java
index 403933159..36b3977e5 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/RedisTemporalJoinSqlParseTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/RedisTemporalJoinRelationSqlParseTest.java
@@ -44,9 +44,9 @@ import org.apache.inlong.sort.protocol.transformation.FilterFunction;
 import org.apache.inlong.sort.protocol.transformation.function.SingleValueFilterFunction;
 import org.apache.inlong.sort.protocol.transformation.operator.EmptyOperator;
 import org.apache.inlong.sort.protocol.transformation.operator.EqualOperator;
-import org.apache.inlong.sort.protocol.transformation.relation.InnerTemporalJoin;
-import org.apache.inlong.sort.protocol.transformation.relation.LeftOuterTemporalJoin;
-import org.apache.inlong.sort.protocol.transformation.relation.TemporalJoin;
+import org.apache.inlong.sort.protocol.transformation.relation.InnerTemporalJoinRelationRelation;
+import org.apache.inlong.sort.protocol.transformation.relation.LeftOuterTemporalJoinRelationRelation;
+import org.apache.inlong.sort.protocol.transformation.relation.TemporalJoinRelation;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -61,7 +61,7 @@ import java.util.stream.Collectors;
 /**
  * Test for {@link RedisExtractNode} and temporal join {@link FlinkSqlParser}
  */
-public class RedisTemporalJoinSqlParseTest extends AbstractTestBase {
+public class RedisTemporalJoinRelationSqlParseTest extends AbstractTestBase {
 
     private MySqlExtractNode buildMySQLExtractNode() {
         List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new StringFormatInfo()),
@@ -127,7 +127,7 @@ public class RedisTemporalJoinSqlParseTest extends AbstractTestBase {
      * @param outputs load node
      * @return node relation
      */
-    private TemporalJoin buildNodeRelation(List<Node> inputs, List<Node> outputs, boolean left) {
+    private TemporalJoinRelation buildNodeRelation(List<Node> inputs, List<Node> outputs, boolean left) {
         List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
         List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
         Map<String, List<FilterFunction>> conditionMap = new TreeMap<>();
@@ -144,9 +144,10 @@ public class RedisTemporalJoinSqlParseTest extends AbstractTestBase {
                 new FieldInfo("id", "1", new LongFormatInfo()),
                 EqualOperator.getInstance(), new FieldInfo("k", "5", new StringFormatInfo()))));
         if (left) {
-            return new LeftOuterTemporalJoin(inputIds, outputIds, conditionMap, new FieldInfo("proc_time"));
+            return new LeftOuterTemporalJoinRelationRelation(inputIds, outputIds, conditionMap,
+                    new FieldInfo("proc_time"));
         }
-        return new InnerTemporalJoin(inputIds, outputIds, conditionMap, new FieldInfo("proc_time"));
+        return new InnerTemporalJoinRelationRelation(inputIds, outputIds, conditionMap, new FieldInfo("proc_time"));
     }
 
     /**