You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/05/25 06:48:09 UTC

[incubator-inlong] branch master updated: [INLONG-4364][Manager] Optimize Sort protocal and Pulsar extract node (#4365)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new cf14c3219 [INLONG-4364][Manager] Optimize Sort protocal and Pulsar extract node (#4365)
cf14c3219 is described below

commit cf14c32197ce0d2ec7bac4ff8e16e63fb4f91b71
Author: kipshi <48...@users.noreply.github.com>
AuthorDate: Wed May 25 14:48:04 2022 +0800

    [INLONG-4364][Manager] Optimize Sort protocal and Pulsar extract node (#4365)
---
 .../manager/client/api/source/AgentFileSource.java |  2 +-
 .../manager/client/api/source/AutoPushSource.java  |  2 +-
 .../client/api/source/MySQLBinlogSource.java       |  2 +-
 .../manager/client/api/source/MySQLSource.java     |  2 +-
 .../manager/client/api/source/PostgresSource.java  |  2 +-
 .../api/util/InlongStreamSourceTransfer.java       | 19 +++++++---
 .../inlong/manager/common/enums/DataFormat.java    |  1 +
 .../pojo/source/pulsar/PulsarSourceResponse.java   |  2 +
 .../inlong/manager/plugin/flink/FlinkService.java  |  2 +-
 .../service/sort/CreateSortConfigListenerV2.java   |  1 +
 .../sort/CreateStreamSortConfigListener.java       |  1 +
 .../service/sort/util/ExtractNodeUtils.java        | 20 ++++++----
 ...nStartupMode.java => KafkaScanStartupMode.java} | 14 +++----
 .../sort/protocol/enums/PulsarScanStartupMode.java | 43 ++++++++++------------
 .../protocol/node/extract/KafkaExtractNode.java    | 12 +++---
 .../node/extract/KafkaExtractNodeTest.java         |  4 +-
 .../sort/parser/DistinctNodeSqlParseTest.java      |  8 ++--
 .../inlong/sort/parser/FlinkSqlParserTest.java     |  4 +-
 .../sort/parser/FullOuterJoinSqlParseTest.java     |  8 ++--
 .../parser/InnerJoinRelationShipSqlParseTest.java  |  8 ++--
 .../sort/parser/LeftOuterJoinSqlParseTest.java     |  8 ++--
 .../inlong/sort/parser/MetaFieldSyncTest.java      |  4 +-
 .../sort/parser/RightOuterJoinSqlParseTest.java    |  8 ++--
 23 files changed, 94 insertions(+), 83 deletions(-)

diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/AgentFileSource.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/AgentFileSource.java
index 3fb83c79d..236be6016 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/AgentFileSource.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/AgentFileSource.java
@@ -44,7 +44,7 @@ public class AgentFileSource extends StreamSource {
     private SyncType syncType = SyncType.INCREMENT;
 
     @ApiModelProperty("Data format type")
-    private DataFormat dataFormat = DataFormat.NONE;
+    private DataFormat dataFormat = DataFormat.JSON;
 
     @ApiModelProperty(value = "Agent IP address", required = true)
     private String ip;
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/AutoPushSource.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/AutoPushSource.java
index af3179653..62f63c1ce 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/AutoPushSource.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/AutoPushSource.java
@@ -44,7 +44,7 @@ public class AutoPushSource extends StreamSource {
     private SyncType syncType = SyncType.INCREMENT;
 
     @ApiModelProperty("Data format type")
-    private DataFormat dataFormat = DataFormat.NONE;
+    private DataFormat dataFormat = DataFormat.JSON;
 
     @ApiModelProperty(value = "DataProxy group name, used when the user enables local configuration")
     private String dataProxyGroup;
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLBinlogSource.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLBinlogSource.java
index e00a65b22..c84f72820 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLBinlogSource.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLBinlogSource.java
@@ -47,7 +47,7 @@ public class MySQLBinlogSource extends StreamSource {
     private SyncType syncType;
 
     @ApiModelProperty("Data format type for binlog")
-    private DataFormat dataFormat = DataFormat.NONE;
+    private DataFormat dataFormat = DataFormat.DEBEZIUM_JSON;
 
     @ApiModelProperty("Auth for binlog")
     private DefaultAuthentication authentication;
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLSource.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLSource.java
index e205acfda..d1e19de04 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLSource.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLSource.java
@@ -65,5 +65,5 @@ public class MySQLSource extends StreamSource {
     private String dataSql;
 
     @ApiModelProperty("Data format type of source")
-    private DataFormat dataFormat;
+    private DataFormat dataFormat = DataFormat.CSV;
 }
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/PostgresSource.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/PostgresSource.java
index c55b0cb65..f7b2bb0a6 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/PostgresSource.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/PostgresSource.java
@@ -45,7 +45,7 @@ public class PostgresSource extends StreamSource {
     private SyncType syncType = SyncType.INCREMENT;
 
     @ApiModelProperty("Data format type")
-    private DataFormat dataFormat = DataFormat.NONE;
+    private DataFormat dataFormat = DataFormat.CSV;
 
     @ApiModelProperty("Db server username")
     private String username;
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
index c051d671b..7cabe1703 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
@@ -20,12 +20,12 @@ package org.apache.inlong.manager.client.api.util;
 import com.google.common.base.Joiner;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.inlong.manager.common.auth.DefaultAuthentication;
 import org.apache.inlong.manager.client.api.source.AgentFileSource;
 import org.apache.inlong.manager.client.api.source.AutoPushSource;
 import org.apache.inlong.manager.client.api.source.KafkaSource;
 import org.apache.inlong.manager.client.api.source.MySQLBinlogSource;
 import org.apache.inlong.manager.client.api.source.PostgresSource;
+import org.apache.inlong.manager.common.auth.DefaultAuthentication;
 import org.apache.inlong.manager.common.enums.DataFormat;
 import org.apache.inlong.manager.common.enums.SourceType;
 import org.apache.inlong.manager.common.pojo.source.SourceRequest;
@@ -118,7 +118,8 @@ public class InlongStreamSourceTransfer {
         MySQLBinlogSource binlogSource = new MySQLBinlogSource();
         binlogSource.setSourceName(response.getSourceName());
         binlogSource.setHostname(response.getHostname());
-        binlogSource.setDataFormat(DataFormat.NONE);
+        DataFormat dataFormat = DataFormat.forName(response.getSerializationType());
+        binlogSource.setDataFormat(dataFormat);
         binlogSource.setPort(response.getPort());
         binlogSource.setAgentIp(response.getAgentIp());
         binlogSource.setState(State.parseByStatus(response.getStatus()));
@@ -148,7 +149,8 @@ public class InlongStreamSourceTransfer {
         AgentFileSource fileSource = new AgentFileSource();
         fileSource.setSourceName(response.getSourceName());
         fileSource.setState(State.parseByStatus(response.getStatus()));
-        fileSource.setDataFormat(DataFormat.NONE);
+        DataFormat dataFormat = DataFormat.forName(response.getSerializationType());
+        fileSource.setDataFormat(dataFormat);
         fileSource.setPattern(response.getPattern());
         fileSource.setIp(response.getIp());
         fileSource.setTimeOffset(response.getTimeOffset());
@@ -160,7 +162,8 @@ public class InlongStreamSourceTransfer {
         AutoPushSource autoPushSource = new AutoPushSource();
         autoPushSource.setSourceName(response.getSourceName());
         autoPushSource.setState(State.parseByStatus(response.getStatus()));
-        autoPushSource.setDataFormat(DataFormat.NONE);
+        DataFormat dataFormat = DataFormat.forName(response.getSerializationType());
+        autoPushSource.setDataFormat(dataFormat);
         autoPushSource.setDataProxyGroup(response.getDataProxyGroup());
         autoPushSource.setFields(InlongStreamTransfer.parseStreamFields(response.getFieldList()));
         return autoPushSource;
@@ -170,7 +173,8 @@ public class InlongStreamSourceTransfer {
         PostgresSource postgresSource = new PostgresSource();
         postgresSource.setSourceName(response.getSourceName());
         postgresSource.setState(State.parseByStatus(response.getStatus()));
-        postgresSource.setDataFormat(DataFormat.NONE);
+        DataFormat dataFormat = DataFormat.forName(response.getSerializationType());
+        postgresSource.setDataFormat(dataFormat);
         postgresSource.setFields(InlongStreamTransfer.parseStreamFields(response.getFieldList()));
 
         postgresSource.setDbName(response.getDatabase());
@@ -227,6 +231,7 @@ public class InlongStreamSourceTransfer {
         sourceRequest.setServerTimezone(binlogSource.getServerTimezone());
         sourceRequest.setMonitoredDdl(binlogSource.getMonitoredDdl());
         sourceRequest.setAllMigration(binlogSource.isAllMigration());
+        sourceRequest.setSerializationType(binlogSource.getDataFormat().getName());
         sourceRequest.setPrimaryKey(binlogSource.getPrimaryKey());
         if (CollectionUtils.isNotEmpty(binlogSource.getDbNames())) {
             String dbNames = Joiner.on(",").join(binlogSource.getDbNames());
@@ -249,6 +254,7 @@ public class InlongStreamSourceTransfer {
         sourceRequest.setInlongGroupId(streamInfo.getInlongGroupId());
         sourceRequest.setInlongStreamId(streamInfo.getInlongStreamId());
         sourceRequest.setSourceType(fileSource.getSourceType().getType());
+        sourceRequest.setSerializationType(fileSource.getDataFormat().getName());
         if (StringUtils.isEmpty(fileSource.getIp())) {
             throw new IllegalArgumentException(
                     String.format("AgentIp should not be null for fileSource=%s", fileSource));
@@ -275,6 +281,7 @@ public class InlongStreamSourceTransfer {
         sourceRequest.setInlongStreamId(streamInfo.getInlongStreamId());
         sourceRequest.setSourceType(source.getSourceType().getType());
         sourceRequest.setDataProxyGroup(source.getDataProxyGroup());
+        sourceRequest.setSerializationType(source.getDataFormat().getName());
         sourceRequest.setFieldList(InlongStreamTransfer.createStreamFields(source.getFields(), streamInfo));
         return sourceRequest;
     }
@@ -295,7 +302,7 @@ public class InlongStreamSourceTransfer {
         sourceRequest.setSchema(source.getSchema());
         sourceRequest.setTableNameList(source.getTableNameList());
         sourceRequest.setUsername(source.getUsername());
-
+        sourceRequest.setSerializationType(source.getDataFormat().getName());
         sourceRequest.setInlongGroupId(streamInfo.getInlongGroupId());
         sourceRequest.setInlongStreamId(streamInfo.getInlongStreamId());
         sourceRequest.setSourceType(source.getSourceType().getType());
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/DataFormat.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/DataFormat.java
index 6ae1cf133..50e274737 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/DataFormat.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/DataFormat.java
@@ -30,6 +30,7 @@ public enum DataFormat {
     AVRO("avro"),
     CANAL("canal"),
     JSON("json"),
+    DEBEZIUM_JSON("debezium_json"),
     NONE("none");
 
     @Getter
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/pulsar/PulsarSourceResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/pulsar/PulsarSourceResponse.java
index d98d4dd59..2626e3dd3 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/pulsar/PulsarSourceResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/pulsar/PulsarSourceResponse.java
@@ -56,6 +56,8 @@ public class PulsarSourceResponse extends SourceResponse {
             + "Available options are earliest, latest, external-subscription, and specific-offsets.")
     private String scanStartupMode = "earliest";
 
+    private boolean isInlongComponent = false;
+
     public PulsarSourceResponse() {
         this.setSourceType(SourceType.PULSAR.getType());
     }
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
index 31e5b2a5e..d180c6bc7 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
@@ -267,7 +267,7 @@ public class FlinkService {
         List<String> list = new ArrayList<>();
         list.add("-cluster-id");
         list.add(flinkInfo.getJobName());
-        list.add("-dataflow.info.file");
+        list.add("-group.info.file");
         list.add(flinkInfo.getLocalConfPath());
         list.add("-checkpoint.interval");
         list.add("60000");
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListenerV2.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListenerV2.java
index b14314be8..fb443eced 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListenerV2.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListenerV2.java
@@ -139,6 +139,7 @@ public class CreateSortConfigListenerV2 implements SortOperateListener {
             pulsarSourceResponse.setTopic(streamInfo.getMqResource());
             pulsarSourceResponse.setAdminUrl(pulsarCluster.getAdminUrl());
             pulsarSourceResponse.setServiceUrl(pulsarCluster.getBrokerServiceUrl());
+            pulsarSourceResponse.setInlongComponent(true);
             List<SourceResponse> sourceResponses = sourceService.listSource(groupInfo.getInlongGroupId(),
                     streamInfo.getInlongStreamId());
             for (SourceResponse sourceResponse : sourceResponses) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateStreamSortConfigListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateStreamSortConfigListener.java
index 4ce935da3..12c21d0f6 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateStreamSortConfigListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateStreamSortConfigListener.java
@@ -131,6 +131,7 @@ public class CreateStreamSortConfigListener implements SortOperateListener {
         pulsarSourceResponse.setTopic(streamInfo.getMqResource());
         pulsarSourceResponse.setAdminUrl(pulsarCluster.getAdminUrl());
         pulsarSourceResponse.setServiceUrl(pulsarCluster.getBrokerServiceUrl());
+        pulsarSourceResponse.setInlongComponent(true);
         List<SourceResponse> sourceResponses = streamSourceService.listSource(groupInfo.getInlongGroupId(),
                 streamInfo.getInlongStreamId());
         for (SourceResponse sourceResponse : sourceResponses) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
index 75f8998a0..829a40f71 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
@@ -29,21 +29,23 @@ import org.apache.inlong.manager.common.pojo.source.SourceResponse;
 import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceResponse;
 import org.apache.inlong.manager.common.pojo.source.kafka.KafkaOffset;
 import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceResponse;
-import org.apache.inlong.manager.common.pojo.source.pulsar.PulsarSourceResponse;
 import org.apache.inlong.manager.common.pojo.source.postgres.PostgresSourceResponse;
+import org.apache.inlong.manager.common.pojo.source.pulsar.PulsarSourceResponse;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamFieldInfo;
 import org.apache.inlong.sort.protocol.FieldInfo;
-import org.apache.inlong.sort.protocol.enums.ScanStartupMode;
+import org.apache.inlong.sort.protocol.enums.KafkaScanStartupMode;
+import org.apache.inlong.sort.protocol.enums.PulsarScanStartupMode;
 import org.apache.inlong.sort.protocol.node.ExtractNode;
 import org.apache.inlong.sort.protocol.node.extract.KafkaExtractNode;
 import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
-import org.apache.inlong.sort.protocol.node.extract.PulsarExtractNode;
 import org.apache.inlong.sort.protocol.node.extract.PostgresExtractNode;
+import org.apache.inlong.sort.protocol.node.extract.PulsarExtractNode;
 import org.apache.inlong.sort.protocol.node.format.AvroFormat;
 import org.apache.inlong.sort.protocol.node.format.CanalJsonFormat;
 import org.apache.inlong.sort.protocol.node.format.CsvFormat;
 import org.apache.inlong.sort.protocol.node.format.DebeziumJsonFormat;
 import org.apache.inlong.sort.protocol.node.format.Format;
+import org.apache.inlong.sort.protocol.node.format.InLongMsgFormat;
 import org.apache.inlong.sort.protocol.node.format.JsonFormat;
 
 import java.util.List;
@@ -175,14 +177,14 @@ public class ExtractNodeUtils {
                 throw new IllegalArgumentException(String.format("Unsupported dataType=%s for kafka source", dataType));
         }
         KafkaOffset kafkaOffset = KafkaOffset.forName(kafkaSourceResponse.getAutoOffsetReset());
-        ScanStartupMode startupMode = null;
+        KafkaScanStartupMode startupMode = null;
         switch (kafkaOffset) {
             case EARLIEST:
-                startupMode = ScanStartupMode.EARLIEST_OFFSET;
+                startupMode = KafkaScanStartupMode.EARLIEST_OFFSET;
                 break;
             case LATEST:
             default:
-                startupMode = ScanStartupMode.LATEST_OFFSET;
+                startupMode = KafkaScanStartupMode.LATEST_OFFSET;
         }
         final String primaryKey = kafkaSourceResponse.getPrimaryKey();
         String groupId = kafkaSourceResponse.getGroupId();
@@ -236,7 +238,11 @@ public class ExtractNodeUtils {
             default:
                 throw new IllegalArgumentException(String.format("Unsupported dataType=%s for kafka source", dataType));
         }
-        ScanStartupMode startupMode = ScanStartupMode.forName(pulsarSourceResponse.getScanStartupMode());
+        if (pulsarSourceResponse.isInlongComponent()) {
+            Format innerFormat = format;
+            format = new InLongMsgFormat(innerFormat, false);
+        }
+        PulsarScanStartupMode startupMode = PulsarScanStartupMode.forName(pulsarSourceResponse.getScanStartupMode());
         final String primaryKey = pulsarSourceResponse.getPrimaryKey();
         final String serviceUrl = pulsarSourceResponse.getServiceUrl();
         final String adminUrl = pulsarSourceResponse.getAdminUrl();
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/enums/ScanStartupMode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/enums/KafkaScanStartupMode.java
similarity index 79%
rename from inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/enums/ScanStartupMode.java
rename to inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/enums/KafkaScanStartupMode.java
index cb9cb943c..bd21aa1fe 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/enums/ScanStartupMode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/enums/KafkaScanStartupMode.java
@@ -23,13 +23,11 @@ import java.util.Locale;
 /**
  * kafka consumer scan startup mode enum
  */
-public enum ScanStartupMode {
+public enum KafkaScanStartupMode {
     EARLIEST_OFFSET("earliest-offset"),
-    LATEST_OFFSET("latest-offset"),
-    EXTERNAL_SUBSCRIPTION("external-subscription"),
-    SPECIFIC_OFFSETS("specific-offsets");
+    LATEST_OFFSET("latest-offset");
 
-    ScanStartupMode(String value) {
+    KafkaScanStartupMode(String value) {
         this.value = value;
     }
 
@@ -39,13 +37,13 @@ public enum ScanStartupMode {
         return value;
     }
 
-    public static ScanStartupMode forName(String name) {
-        for (ScanStartupMode startupMode : values()) {
+    public static KafkaScanStartupMode forName(String name) {
+        for (KafkaScanStartupMode startupMode : values()) {
             if (startupMode.getValue().equals(name.toLowerCase(Locale.ROOT))) {
                 return startupMode;
             }
         }
-        throw new IllegalArgumentException(String.format("Unsupported ScanStartupMode=%s for Inlong", name));
+        throw new IllegalArgumentException(String.format("Unsupported KafkaScanStartupMode=%s for Inlong", name));
     }
 
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/DataFormat.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/enums/PulsarScanStartupMode.java
similarity index 57%
copy from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/DataFormat.java
copy to inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/enums/PulsarScanStartupMode.java
index 6ae1cf133..0ba92d6b9 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/DataFormat.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/enums/PulsarScanStartupMode.java
@@ -15,39 +15,34 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.common.enums;
-
-import lombok.Getter;
+package org.apache.inlong.sort.protocol.enums;
 
 import java.util.Locale;
 
-/**
- * Enum of data format.
- */
-public enum DataFormat {
+public enum PulsarScanStartupMode {
 
-    CSV("csv"),
-    AVRO("avro"),
-    CANAL("canal"),
-    JSON("json"),
-    NONE("none");
+    EARLIEST("earliest"),
+    LATEST("latest"),
+    EXTERNAL_SUBSCRIPTION("external-subscription"),
+    SPECIFIC_OFFSETS("specific-offsets");
 
-    @Getter
-    private final String name;
+    PulsarScanStartupMode(String value) {
+        this.value = value;
+    }
 
-    DataFormat(String name) {
-        this.name = name;
+    private final String value;
+
+    public String getValue() {
+        return value;
     }
 
-    /**
-     * Get dataformat for inlong by name.
-     */
-    public static DataFormat forName(String name) {
-        for (DataFormat dataFormat : values()) {
-            if (dataFormat.getName().equals(name.toLowerCase(Locale.ROOT))) {
-                return dataFormat;
+    public static PulsarScanStartupMode forName(String name) {
+        for (PulsarScanStartupMode startupMode : values()) {
+            if (startupMode.getValue().equals(name.toLowerCase(Locale.ROOT))) {
+                return startupMode;
             }
         }
-        throw new IllegalArgumentException(String.format("Unsupported DataFormat=%s for Inlong", name));
+        throw new IllegalArgumentException(String.format("Unsupported PulsarScanStartupMode=%s for Inlong", name));
     }
+
 }
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
index 4465a5e71..0677cd69c 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
@@ -26,7 +26,7 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonPro
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.constant.KafkaConstant;
-import org.apache.inlong.sort.protocol.enums.ScanStartupMode;
+import org.apache.inlong.sort.protocol.enums.KafkaScanStartupMode;
 import org.apache.inlong.sort.protocol.node.ExtractNode;
 import org.apache.inlong.sort.protocol.node.format.AvroFormat;
 import org.apache.inlong.sort.protocol.node.format.CanalJsonFormat;
@@ -63,7 +63,7 @@ public class KafkaExtractNode extends ExtractNode implements Serializable {
     private Format format;
 
     @JsonProperty("scanStartupMode")
-    private ScanStartupMode scanStartupMode;
+    private KafkaScanStartupMode kafkaScanStartupMode;
 
     @JsonProperty("primaryKey")
     private String primaryKey;
@@ -80,14 +80,14 @@ public class KafkaExtractNode extends ExtractNode implements Serializable {
             @Nonnull @JsonProperty("topic") String topic,
             @Nonnull @JsonProperty("bootstrapServers") String bootstrapServers,
             @Nonnull @JsonProperty("format") Format format,
-            @JsonProperty("scanStartupMode") ScanStartupMode scanStartupMode,
+            @JsonProperty("scanStartupMode") KafkaScanStartupMode kafkaScanStartupMode,
             @JsonProperty("primaryKey") String primaryKey,
             @JsonProperty("groupId") String groupId) {
         super(id, name, fields, watermarkField, properties);
         this.topic = Preconditions.checkNotNull(topic, "kafka topic is empty");
         this.bootstrapServers = Preconditions.checkNotNull(bootstrapServers, "kafka bootstrapServers is empty");
         this.format = Preconditions.checkNotNull(format, "kafka format is empty");
-        this.scanStartupMode = Preconditions.checkNotNull(scanStartupMode, "kafka scanStartupMode is empty");
+        this.kafkaScanStartupMode = Preconditions.checkNotNull(kafkaScanStartupMode, "kafka scanStartupMode is empty");
         this.primaryKey = primaryKey;
         this.groupId = groupId;
     }
@@ -105,7 +105,7 @@ public class KafkaExtractNode extends ExtractNode implements Serializable {
         if (format instanceof JsonFormat || format instanceof AvroFormat || format instanceof CsvFormat) {
             if (StringUtils.isEmpty(this.primaryKey)) {
                 options.put(KafkaConstant.CONNECTOR, KafkaConstant.KAFKA);
-                options.put(KafkaConstant.SCAN_STARTUP_MODE, scanStartupMode.getValue());
+                options.put(KafkaConstant.SCAN_STARTUP_MODE, kafkaScanStartupMode.getValue());
                 options.putAll(format.generateOptions(false));
             } else {
                 options.put(KafkaConstant.CONNECTOR, KafkaConstant.UPSERT_KAFKA);
@@ -113,7 +113,7 @@ public class KafkaExtractNode extends ExtractNode implements Serializable {
             }
         } else if (format instanceof CanalJsonFormat || format instanceof DebeziumJsonFormat) {
             options.put(KafkaConstant.CONNECTOR, KafkaConstant.KAFKA);
-            options.put(KafkaConstant.SCAN_STARTUP_MODE, scanStartupMode.getValue());
+            options.put(KafkaConstant.SCAN_STARTUP_MODE, kafkaScanStartupMode.getValue());
             options.putAll(format.generateOptions(false));
         } else {
             throw new IllegalArgumentException("kafka extract node format is IllegalArgument");
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNodeTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNodeTest.java
index e0a639fd8..4315ef4ae 100644
--- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNodeTest.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNodeTest.java
@@ -21,7 +21,7 @@ import org.apache.inlong.sort.SerializeBaseTest;
 import org.apache.inlong.sort.formats.common.IntFormatInfo;
 import org.apache.inlong.sort.formats.common.StringFormatInfo;
 import org.apache.inlong.sort.protocol.FieldInfo;
-import org.apache.inlong.sort.protocol.enums.ScanStartupMode;
+import org.apache.inlong.sort.protocol.enums.KafkaScanStartupMode;
 import org.apache.inlong.sort.protocol.node.Node;
 import org.apache.inlong.sort.protocol.node.format.CsvFormat;
 
@@ -39,6 +39,6 @@ public class KafkaExtractNodeTest extends SerializeBaseTest<Node> {
                 new FieldInfo("name", new StringFormatInfo()),
                 new FieldInfo("age", new IntFormatInfo()));
         return new KafkaExtractNode("1", "kafka_input", fields, null, null, "workerCsv",
-                "localhost:9092", new CsvFormat(), ScanStartupMode.EARLIEST_OFFSET, null, "groupId");
+                "localhost:9092", new CsvFormat(), KafkaScanStartupMode.EARLIEST_OFFSET, null, "groupId");
     }
 }
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DistinctNodeSqlParseTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DistinctNodeSqlParseTest.java
index 94085f7c8..5f1192922 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DistinctNodeSqlParseTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DistinctNodeSqlParseTest.java
@@ -33,7 +33,7 @@ import org.apache.inlong.sort.protocol.BuiltInFieldInfo.BuiltInField;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.GroupInfo;
 import org.apache.inlong.sort.protocol.StreamInfo;
-import org.apache.inlong.sort.protocol.enums.ScanStartupMode;
+import org.apache.inlong.sort.protocol.enums.KafkaScanStartupMode;
 import org.apache.inlong.sort.protocol.node.Node;
 import org.apache.inlong.sort.protocol.node.extract.KafkaExtractNode;
 import org.apache.inlong.sort.protocol.node.format.JsonFormat;
@@ -68,7 +68,7 @@ public class DistinctNodeSqlParseTest extends AbstractTestBase {
                 new BuiltInFieldInfo("proctime", new TimestampFormatInfo(), BuiltInField.PROCESS_TIME));
         return new KafkaExtractNode("1", "kafka_input", fields, null,
                 null, "topic_input", "localhost:9092",
-                new JsonFormat(), ScanStartupMode.EARLIEST_OFFSET,
+                new JsonFormat(), KafkaScanStartupMode.EARLIEST_OFFSET,
                 null, "groupId");
     }
 
@@ -84,7 +84,7 @@ public class DistinctNodeSqlParseTest extends AbstractTestBase {
                 new TimeUnitConstantParam(TimeUnit.SECOND));
         return new KafkaExtractNode("1", "kafka_input", fields, wk,
                 null, "topic_input", "localhost:9092",
-                new JsonFormat(), ScanStartupMode.EARLIEST_OFFSET,
+                new JsonFormat(), KafkaScanStartupMode.EARLIEST_OFFSET,
                 null, "groupId");
     }
 
@@ -96,7 +96,7 @@ public class DistinctNodeSqlParseTest extends AbstractTestBase {
                 new FieldInfo("ts", new TimestampFormatInfo()));
         return new KafkaExtractNode("1", "kafka_input", fields, null,
                 null, "topic_input", "localhost:9092",
-                new JsonFormat(), ScanStartupMode.EARLIEST_OFFSET, null, "groupId");
+                new JsonFormat(), KafkaScanStartupMode.EARLIEST_OFFSET, null, "groupId");
     }
 
     private KafkaLoadNode buildKafkaLoadNode() {
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FlinkSqlParserTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FlinkSqlParserTest.java
index 14ec958d9..b81a54c92 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FlinkSqlParserTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FlinkSqlParserTest.java
@@ -31,7 +31,7 @@ import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.GroupInfo;
 import org.apache.inlong.sort.protocol.StreamInfo;
-import org.apache.inlong.sort.protocol.enums.ScanStartupMode;
+import org.apache.inlong.sort.protocol.enums.KafkaScanStartupMode;
 import org.apache.inlong.sort.protocol.node.Node;
 import org.apache.inlong.sort.protocol.node.extract.KafkaExtractNode;
 import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
@@ -87,7 +87,7 @@ public class FlinkSqlParserTest extends AbstractTestBase {
                 new StringConstantParam("5"),
                 new TimeUnitConstantParam(TimeUnit.SECOND));
         return new KafkaExtractNode(id, "kafka_input", fields, wk, null, "workerJson",
-                "localhost:9092", new JsonFormat(), ScanStartupMode.EARLIEST_OFFSET, null, "groupId");
+                "localhost:9092", new JsonFormat(), KafkaScanStartupMode.EARLIEST_OFFSET, null, "groupId");
     }
 
     private KafkaLoadNode buildKafkaNode(String id) {
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FullOuterJoinSqlParseTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FullOuterJoinSqlParseTest.java
index d01075ada..cd629ee6c 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FullOuterJoinSqlParseTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FullOuterJoinSqlParseTest.java
@@ -31,7 +31,7 @@ import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.GroupInfo;
 import org.apache.inlong.sort.protocol.StreamInfo;
-import org.apache.inlong.sort.protocol.enums.ScanStartupMode;
+import org.apache.inlong.sort.protocol.enums.KafkaScanStartupMode;
 import org.apache.inlong.sort.protocol.node.Node;
 import org.apache.inlong.sort.protocol.node.extract.KafkaExtractNode;
 import org.apache.inlong.sort.protocol.node.format.JsonFormat;
@@ -75,7 +75,7 @@ public class FullOuterJoinSqlParseTest extends AbstractTestBase {
                 new FieldInfo("name", new StringFormatInfo()));
         return new KafkaExtractNode("1", "kafka_input_1", fields, null,
                 null, "topic_input_1", "localhost:9092",
-                new JsonFormat(), ScanStartupMode.EARLIEST_OFFSET,
+                new JsonFormat(), KafkaScanStartupMode.EARLIEST_OFFSET,
                 null, "groupId");
     }
 
@@ -89,7 +89,7 @@ public class FullOuterJoinSqlParseTest extends AbstractTestBase {
                 new FieldInfo("age", new IntFormatInfo()));
         return new KafkaExtractNode("2", "kafka_input_2", fields, null,
                 null, "topic_input_2", "localhost:9092",
-                new JsonFormat(), ScanStartupMode.EARLIEST_OFFSET,
+                new JsonFormat(), KafkaScanStartupMode.EARLIEST_OFFSET,
                 null, "groupId");
     }
 
@@ -104,7 +104,7 @@ public class FullOuterJoinSqlParseTest extends AbstractTestBase {
                 new FieldInfo("ts", new TimestampFormatInfo()));
         return new KafkaExtractNode("3", "kafka_input_3", fields, null,
                 null, "topic_input_3", "localhost:9092",
-                new JsonFormat(), ScanStartupMode.EARLIEST_OFFSET, null, "groupId");
+                new JsonFormat(), KafkaScanStartupMode.EARLIEST_OFFSET, null, "groupId");
     }
 
     /**
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/InnerJoinRelationShipSqlParseTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/InnerJoinRelationShipSqlParseTest.java
index 05eeea8dd..7fae01782 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/InnerJoinRelationShipSqlParseTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/InnerJoinRelationShipSqlParseTest.java
@@ -31,7 +31,7 @@ import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.GroupInfo;
 import org.apache.inlong.sort.protocol.StreamInfo;
-import org.apache.inlong.sort.protocol.enums.ScanStartupMode;
+import org.apache.inlong.sort.protocol.enums.KafkaScanStartupMode;
 import org.apache.inlong.sort.protocol.node.Node;
 import org.apache.inlong.sort.protocol.node.extract.KafkaExtractNode;
 import org.apache.inlong.sort.protocol.node.format.JsonFormat;
@@ -75,7 +75,7 @@ public class InnerJoinRelationShipSqlParseTest extends AbstractTestBase {
                 new FieldInfo("name", new StringFormatInfo()));
         return new KafkaExtractNode("1", "kafka_input_1", fields, null,
                 null, "topic_input_1", "localhost:9092",
-                new JsonFormat(), ScanStartupMode.EARLIEST_OFFSET,
+                new JsonFormat(), KafkaScanStartupMode.EARLIEST_OFFSET,
                 null, "groupId");
     }
 
@@ -89,7 +89,7 @@ public class InnerJoinRelationShipSqlParseTest extends AbstractTestBase {
                 new FieldInfo("age", new IntFormatInfo()));
         return new KafkaExtractNode("2", "kafka_input_2", fields, null,
                 null, "topic_input_2", "localhost:9092",
-                new JsonFormat(), ScanStartupMode.EARLIEST_OFFSET,
+                new JsonFormat(), KafkaScanStartupMode.EARLIEST_OFFSET,
                 null, "groupId");
     }
 
@@ -104,7 +104,7 @@ public class InnerJoinRelationShipSqlParseTest extends AbstractTestBase {
                 new FieldInfo("ts", new TimestampFormatInfo()));
         return new KafkaExtractNode("3", "kafka_input_3", fields, null,
                 null, "topic_input_3", "localhost:9092",
-                new JsonFormat(), ScanStartupMode.EARLIEST_OFFSET, null, "groupId");
+                new JsonFormat(), KafkaScanStartupMode.EARLIEST_OFFSET, null, "groupId");
     }
 
     /**
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/LeftOuterJoinSqlParseTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/LeftOuterJoinSqlParseTest.java
index ba61be859..5758372fc 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/LeftOuterJoinSqlParseTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/LeftOuterJoinSqlParseTest.java
@@ -32,7 +32,7 @@ import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.GroupInfo;
 import org.apache.inlong.sort.protocol.StreamInfo;
 import org.apache.inlong.sort.protocol.enums.FilterStrategy;
-import org.apache.inlong.sort.protocol.enums.ScanStartupMode;
+import org.apache.inlong.sort.protocol.enums.KafkaScanStartupMode;
 import org.apache.inlong.sort.protocol.node.Node;
 import org.apache.inlong.sort.protocol.node.extract.KafkaExtractNode;
 import org.apache.inlong.sort.protocol.node.format.JsonFormat;
@@ -76,7 +76,7 @@ public class LeftOuterJoinSqlParseTest extends AbstractTestBase {
                 new FieldInfo("name", new StringFormatInfo()));
         return new KafkaExtractNode("1", "kafka_input_1", fields, null,
                 null, "topic_input_1", "localhost:9092",
-                new JsonFormat(), ScanStartupMode.EARLIEST_OFFSET,
+                new JsonFormat(), KafkaScanStartupMode.EARLIEST_OFFSET,
                 null, "groupId");
     }
 
@@ -90,7 +90,7 @@ public class LeftOuterJoinSqlParseTest extends AbstractTestBase {
                 new FieldInfo("age", new IntFormatInfo()));
         return new KafkaExtractNode("2", "kafka_input_2", fields, null,
                 null, "topic_input_2", "localhost:9092",
-                new JsonFormat(), ScanStartupMode.EARLIEST_OFFSET,
+                new JsonFormat(), KafkaScanStartupMode.EARLIEST_OFFSET,
                 null, "groupId");
     }
 
@@ -105,7 +105,7 @@ public class LeftOuterJoinSqlParseTest extends AbstractTestBase {
                 new FieldInfo("ts", new TimestampFormatInfo()));
         return new KafkaExtractNode("3", "kafka_input_3", fields, null,
                 null, "topic_input_3", "localhost:9092",
-                new JsonFormat(), ScanStartupMode.EARLIEST_OFFSET, null, "groupId");
+                new JsonFormat(), KafkaScanStartupMode.EARLIEST_OFFSET, null, "groupId");
     }
 
     /**
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MetaFieldSyncTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MetaFieldSyncTest.java
index ebf5f1f50..252ca4f0e 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MetaFieldSyncTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MetaFieldSyncTest.java
@@ -36,7 +36,7 @@ import org.apache.inlong.sort.protocol.BuiltInFieldInfo.BuiltInField;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.GroupInfo;
 import org.apache.inlong.sort.protocol.StreamInfo;
-import org.apache.inlong.sort.protocol.enums.ScanStartupMode;
+import org.apache.inlong.sort.protocol.enums.KafkaScanStartupMode;
 import org.apache.inlong.sort.protocol.node.Node;
 import org.apache.inlong.sort.protocol.node.extract.KafkaExtractNode;
 import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
@@ -187,7 +187,7 @@ public class MetaFieldSyncTest extends AbstractTestBase {
         );
         return new KafkaExtractNode("3", "kafka_input", fields,
                 null, null, "topic1", "localhost:9092",
-                new CanalJsonFormat(), ScanStartupMode.EARLIEST_OFFSET,
+                new CanalJsonFormat(), KafkaScanStartupMode.EARLIEST_OFFSET,
                 null, "groupId");
     }
 
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/RightOuterJoinSqlParseTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/RightOuterJoinSqlParseTest.java
index f5462136d..51af295fd 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/RightOuterJoinSqlParseTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/RightOuterJoinSqlParseTest.java
@@ -31,7 +31,7 @@ import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.GroupInfo;
 import org.apache.inlong.sort.protocol.StreamInfo;
-import org.apache.inlong.sort.protocol.enums.ScanStartupMode;
+import org.apache.inlong.sort.protocol.enums.KafkaScanStartupMode;
 import org.apache.inlong.sort.protocol.node.Node;
 import org.apache.inlong.sort.protocol.node.extract.KafkaExtractNode;
 import org.apache.inlong.sort.protocol.node.format.JsonFormat;
@@ -75,7 +75,7 @@ public class RightOuterJoinSqlParseTest extends AbstractTestBase {
                 new FieldInfo("name", new StringFormatInfo()));
         return new KafkaExtractNode("1", "kafka_input_1", fields, null,
                 null, "topic_input_1", "localhost:9092",
-                new JsonFormat(), ScanStartupMode.EARLIEST_OFFSET,
+                new JsonFormat(), KafkaScanStartupMode.EARLIEST_OFFSET,
                 null, "groupId");
     }
 
@@ -89,7 +89,7 @@ public class RightOuterJoinSqlParseTest extends AbstractTestBase {
                 new FieldInfo("age", new IntFormatInfo()));
         return new KafkaExtractNode("2", "kafka_input_2", fields, null,
                 null, "topic_input_2", "localhost:9092",
-                new JsonFormat(), ScanStartupMode.EARLIEST_OFFSET,
+                new JsonFormat(), KafkaScanStartupMode.EARLIEST_OFFSET,
                 null, "groupId");
     }
 
@@ -104,7 +104,7 @@ public class RightOuterJoinSqlParseTest extends AbstractTestBase {
                 new FieldInfo("ts", new TimestampFormatInfo()));
         return new KafkaExtractNode("3", "kafka_input_3", fields, null,
                 null, "topic_input_3", "localhost:9092",
-                new JsonFormat(), ScanStartupMode.EARLIEST_OFFSET, null, "groupId");
+                new JsonFormat(), KafkaScanStartupMode.EARLIEST_OFFSET, null, "groupId");
     }
 
     /**