You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/05/25 06:48:09 UTC
[incubator-inlong] branch master updated: [INLONG-4364][Manager] Optimize Sort protocal and Pulsar extract node (#4365)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new cf14c3219 [INLONG-4364][Manager] Optimize Sort protocal and Pulsar extract node (#4365)
cf14c3219 is described below
commit cf14c32197ce0d2ec7bac4ff8e16e63fb4f91b71
Author: kipshi <48...@users.noreply.github.com>
AuthorDate: Wed May 25 14:48:04 2022 +0800
[INLONG-4364][Manager] Optimize Sort protocal and Pulsar extract node (#4365)
---
.../manager/client/api/source/AgentFileSource.java | 2 +-
.../manager/client/api/source/AutoPushSource.java | 2 +-
.../client/api/source/MySQLBinlogSource.java | 2 +-
.../manager/client/api/source/MySQLSource.java | 2 +-
.../manager/client/api/source/PostgresSource.java | 2 +-
.../api/util/InlongStreamSourceTransfer.java | 19 +++++++---
.../inlong/manager/common/enums/DataFormat.java | 1 +
.../pojo/source/pulsar/PulsarSourceResponse.java | 2 +
.../inlong/manager/plugin/flink/FlinkService.java | 2 +-
.../service/sort/CreateSortConfigListenerV2.java | 1 +
.../sort/CreateStreamSortConfigListener.java | 1 +
.../service/sort/util/ExtractNodeUtils.java | 20 ++++++----
...nStartupMode.java => KafkaScanStartupMode.java} | 14 +++----
.../sort/protocol/enums/PulsarScanStartupMode.java | 43 ++++++++++------------
.../protocol/node/extract/KafkaExtractNode.java | 12 +++---
.../node/extract/KafkaExtractNodeTest.java | 4 +-
.../sort/parser/DistinctNodeSqlParseTest.java | 8 ++--
.../inlong/sort/parser/FlinkSqlParserTest.java | 4 +-
.../sort/parser/FullOuterJoinSqlParseTest.java | 8 ++--
.../parser/InnerJoinRelationShipSqlParseTest.java | 8 ++--
.../sort/parser/LeftOuterJoinSqlParseTest.java | 8 ++--
.../inlong/sort/parser/MetaFieldSyncTest.java | 4 +-
.../sort/parser/RightOuterJoinSqlParseTest.java | 8 ++--
23 files changed, 94 insertions(+), 83 deletions(-)
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/AgentFileSource.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/AgentFileSource.java
index 3fb83c79d..236be6016 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/AgentFileSource.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/AgentFileSource.java
@@ -44,7 +44,7 @@ public class AgentFileSource extends StreamSource {
private SyncType syncType = SyncType.INCREMENT;
@ApiModelProperty("Data format type")
- private DataFormat dataFormat = DataFormat.NONE;
+ private DataFormat dataFormat = DataFormat.JSON;
@ApiModelProperty(value = "Agent IP address", required = true)
private String ip;
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/AutoPushSource.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/AutoPushSource.java
index af3179653..62f63c1ce 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/AutoPushSource.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/AutoPushSource.java
@@ -44,7 +44,7 @@ public class AutoPushSource extends StreamSource {
private SyncType syncType = SyncType.INCREMENT;
@ApiModelProperty("Data format type")
- private DataFormat dataFormat = DataFormat.NONE;
+ private DataFormat dataFormat = DataFormat.JSON;
@ApiModelProperty(value = "DataProxy group name, used when the user enables local configuration")
private String dataProxyGroup;
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLBinlogSource.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLBinlogSource.java
index e00a65b22..c84f72820 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLBinlogSource.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLBinlogSource.java
@@ -47,7 +47,7 @@ public class MySQLBinlogSource extends StreamSource {
private SyncType syncType;
@ApiModelProperty("Data format type for binlog")
- private DataFormat dataFormat = DataFormat.NONE;
+ private DataFormat dataFormat = DataFormat.DEBEZIUM_JSON;
@ApiModelProperty("Auth for binlog")
private DefaultAuthentication authentication;
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLSource.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLSource.java
index e205acfda..d1e19de04 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLSource.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLSource.java
@@ -65,5 +65,5 @@ public class MySQLSource extends StreamSource {
private String dataSql;
@ApiModelProperty("Data format type of source")
- private DataFormat dataFormat;
+ private DataFormat dataFormat = DataFormat.CSV;
}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/PostgresSource.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/PostgresSource.java
index c55b0cb65..f7b2bb0a6 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/PostgresSource.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/PostgresSource.java
@@ -45,7 +45,7 @@ public class PostgresSource extends StreamSource {
private SyncType syncType = SyncType.INCREMENT;
@ApiModelProperty("Data format type")
- private DataFormat dataFormat = DataFormat.NONE;
+ private DataFormat dataFormat = DataFormat.CSV;
@ApiModelProperty("Db server username")
private String username;
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
index c051d671b..7cabe1703 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
@@ -20,12 +20,12 @@ package org.apache.inlong.manager.client.api.util;
import com.google.common.base.Joiner;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
-import org.apache.inlong.manager.common.auth.DefaultAuthentication;
import org.apache.inlong.manager.client.api.source.AgentFileSource;
import org.apache.inlong.manager.client.api.source.AutoPushSource;
import org.apache.inlong.manager.client.api.source.KafkaSource;
import org.apache.inlong.manager.client.api.source.MySQLBinlogSource;
import org.apache.inlong.manager.client.api.source.PostgresSource;
+import org.apache.inlong.manager.common.auth.DefaultAuthentication;
import org.apache.inlong.manager.common.enums.DataFormat;
import org.apache.inlong.manager.common.enums.SourceType;
import org.apache.inlong.manager.common.pojo.source.SourceRequest;
@@ -118,7 +118,8 @@ public class InlongStreamSourceTransfer {
MySQLBinlogSource binlogSource = new MySQLBinlogSource();
binlogSource.setSourceName(response.getSourceName());
binlogSource.setHostname(response.getHostname());
- binlogSource.setDataFormat(DataFormat.NONE);
+ DataFormat dataFormat = DataFormat.forName(response.getSerializationType());
+ binlogSource.setDataFormat(dataFormat);
binlogSource.setPort(response.getPort());
binlogSource.setAgentIp(response.getAgentIp());
binlogSource.setState(State.parseByStatus(response.getStatus()));
@@ -148,7 +149,8 @@ public class InlongStreamSourceTransfer {
AgentFileSource fileSource = new AgentFileSource();
fileSource.setSourceName(response.getSourceName());
fileSource.setState(State.parseByStatus(response.getStatus()));
- fileSource.setDataFormat(DataFormat.NONE);
+ DataFormat dataFormat = DataFormat.forName(response.getSerializationType());
+ fileSource.setDataFormat(dataFormat);
fileSource.setPattern(response.getPattern());
fileSource.setIp(response.getIp());
fileSource.setTimeOffset(response.getTimeOffset());
@@ -160,7 +162,8 @@ public class InlongStreamSourceTransfer {
AutoPushSource autoPushSource = new AutoPushSource();
autoPushSource.setSourceName(response.getSourceName());
autoPushSource.setState(State.parseByStatus(response.getStatus()));
- autoPushSource.setDataFormat(DataFormat.NONE);
+ DataFormat dataFormat = DataFormat.forName(response.getSerializationType());
+ autoPushSource.setDataFormat(dataFormat);
autoPushSource.setDataProxyGroup(response.getDataProxyGroup());
autoPushSource.setFields(InlongStreamTransfer.parseStreamFields(response.getFieldList()));
return autoPushSource;
@@ -170,7 +173,8 @@ public class InlongStreamSourceTransfer {
PostgresSource postgresSource = new PostgresSource();
postgresSource.setSourceName(response.getSourceName());
postgresSource.setState(State.parseByStatus(response.getStatus()));
- postgresSource.setDataFormat(DataFormat.NONE);
+ DataFormat dataFormat = DataFormat.forName(response.getSerializationType());
+ postgresSource.setDataFormat(dataFormat);
postgresSource.setFields(InlongStreamTransfer.parseStreamFields(response.getFieldList()));
postgresSource.setDbName(response.getDatabase());
@@ -227,6 +231,7 @@ public class InlongStreamSourceTransfer {
sourceRequest.setServerTimezone(binlogSource.getServerTimezone());
sourceRequest.setMonitoredDdl(binlogSource.getMonitoredDdl());
sourceRequest.setAllMigration(binlogSource.isAllMigration());
+ sourceRequest.setSerializationType(binlogSource.getDataFormat().getName());
sourceRequest.setPrimaryKey(binlogSource.getPrimaryKey());
if (CollectionUtils.isNotEmpty(binlogSource.getDbNames())) {
String dbNames = Joiner.on(",").join(binlogSource.getDbNames());
@@ -249,6 +254,7 @@ public class InlongStreamSourceTransfer {
sourceRequest.setInlongGroupId(streamInfo.getInlongGroupId());
sourceRequest.setInlongStreamId(streamInfo.getInlongStreamId());
sourceRequest.setSourceType(fileSource.getSourceType().getType());
+ sourceRequest.setSerializationType(fileSource.getDataFormat().getName());
if (StringUtils.isEmpty(fileSource.getIp())) {
throw new IllegalArgumentException(
String.format("AgentIp should not be null for fileSource=%s", fileSource));
@@ -275,6 +281,7 @@ public class InlongStreamSourceTransfer {
sourceRequest.setInlongStreamId(streamInfo.getInlongStreamId());
sourceRequest.setSourceType(source.getSourceType().getType());
sourceRequest.setDataProxyGroup(source.getDataProxyGroup());
+ sourceRequest.setSerializationType(source.getDataFormat().getName());
sourceRequest.setFieldList(InlongStreamTransfer.createStreamFields(source.getFields(), streamInfo));
return sourceRequest;
}
@@ -295,7 +302,7 @@ public class InlongStreamSourceTransfer {
sourceRequest.setSchema(source.getSchema());
sourceRequest.setTableNameList(source.getTableNameList());
sourceRequest.setUsername(source.getUsername());
-
+ sourceRequest.setSerializationType(source.getDataFormat().getName());
sourceRequest.setInlongGroupId(streamInfo.getInlongGroupId());
sourceRequest.setInlongStreamId(streamInfo.getInlongStreamId());
sourceRequest.setSourceType(source.getSourceType().getType());
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/DataFormat.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/DataFormat.java
index 6ae1cf133..50e274737 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/DataFormat.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/DataFormat.java
@@ -30,6 +30,7 @@ public enum DataFormat {
AVRO("avro"),
CANAL("canal"),
JSON("json"),
+ DEBEZIUM_JSON("debezium_json"),
NONE("none");
@Getter
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/pulsar/PulsarSourceResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/pulsar/PulsarSourceResponse.java
index d98d4dd59..2626e3dd3 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/pulsar/PulsarSourceResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/pulsar/PulsarSourceResponse.java
@@ -56,6 +56,8 @@ public class PulsarSourceResponse extends SourceResponse {
+ "Available options are earliest, latest, external-subscription, and specific-offsets.")
private String scanStartupMode = "earliest";
+ private boolean isInlongComponent = false;
+
public PulsarSourceResponse() {
this.setSourceType(SourceType.PULSAR.getType());
}
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
index 31e5b2a5e..d180c6bc7 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
@@ -267,7 +267,7 @@ public class FlinkService {
List<String> list = new ArrayList<>();
list.add("-cluster-id");
list.add(flinkInfo.getJobName());
- list.add("-dataflow.info.file");
+ list.add("-group.info.file");
list.add(flinkInfo.getLocalConfPath());
list.add("-checkpoint.interval");
list.add("60000");
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListenerV2.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListenerV2.java
index b14314be8..fb443eced 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListenerV2.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListenerV2.java
@@ -139,6 +139,7 @@ public class CreateSortConfigListenerV2 implements SortOperateListener {
pulsarSourceResponse.setTopic(streamInfo.getMqResource());
pulsarSourceResponse.setAdminUrl(pulsarCluster.getAdminUrl());
pulsarSourceResponse.setServiceUrl(pulsarCluster.getBrokerServiceUrl());
+ pulsarSourceResponse.setInlongComponent(true);
List<SourceResponse> sourceResponses = sourceService.listSource(groupInfo.getInlongGroupId(),
streamInfo.getInlongStreamId());
for (SourceResponse sourceResponse : sourceResponses) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateStreamSortConfigListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateStreamSortConfigListener.java
index 4ce935da3..12c21d0f6 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateStreamSortConfigListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateStreamSortConfigListener.java
@@ -131,6 +131,7 @@ public class CreateStreamSortConfigListener implements SortOperateListener {
pulsarSourceResponse.setTopic(streamInfo.getMqResource());
pulsarSourceResponse.setAdminUrl(pulsarCluster.getAdminUrl());
pulsarSourceResponse.setServiceUrl(pulsarCluster.getBrokerServiceUrl());
+ pulsarSourceResponse.setInlongComponent(true);
List<SourceResponse> sourceResponses = streamSourceService.listSource(groupInfo.getInlongGroupId(),
streamInfo.getInlongStreamId());
for (SourceResponse sourceResponse : sourceResponses) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
index 75f8998a0..829a40f71 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
@@ -29,21 +29,23 @@ import org.apache.inlong.manager.common.pojo.source.SourceResponse;
import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceResponse;
import org.apache.inlong.manager.common.pojo.source.kafka.KafkaOffset;
import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceResponse;
-import org.apache.inlong.manager.common.pojo.source.pulsar.PulsarSourceResponse;
import org.apache.inlong.manager.common.pojo.source.postgres.PostgresSourceResponse;
+import org.apache.inlong.manager.common.pojo.source.pulsar.PulsarSourceResponse;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamFieldInfo;
import org.apache.inlong.sort.protocol.FieldInfo;
-import org.apache.inlong.sort.protocol.enums.ScanStartupMode;
+import org.apache.inlong.sort.protocol.enums.KafkaScanStartupMode;
+import org.apache.inlong.sort.protocol.enums.PulsarScanStartupMode;
import org.apache.inlong.sort.protocol.node.ExtractNode;
import org.apache.inlong.sort.protocol.node.extract.KafkaExtractNode;
import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
-import org.apache.inlong.sort.protocol.node.extract.PulsarExtractNode;
import org.apache.inlong.sort.protocol.node.extract.PostgresExtractNode;
+import org.apache.inlong.sort.protocol.node.extract.PulsarExtractNode;
import org.apache.inlong.sort.protocol.node.format.AvroFormat;
import org.apache.inlong.sort.protocol.node.format.CanalJsonFormat;
import org.apache.inlong.sort.protocol.node.format.CsvFormat;
import org.apache.inlong.sort.protocol.node.format.DebeziumJsonFormat;
import org.apache.inlong.sort.protocol.node.format.Format;
+import org.apache.inlong.sort.protocol.node.format.InLongMsgFormat;
import org.apache.inlong.sort.protocol.node.format.JsonFormat;
import java.util.List;
@@ -175,14 +177,14 @@ public class ExtractNodeUtils {
throw new IllegalArgumentException(String.format("Unsupported dataType=%s for kafka source", dataType));
}
KafkaOffset kafkaOffset = KafkaOffset.forName(kafkaSourceResponse.getAutoOffsetReset());
- ScanStartupMode startupMode = null;
+ KafkaScanStartupMode startupMode = null;
switch (kafkaOffset) {
case EARLIEST:
- startupMode = ScanStartupMode.EARLIEST_OFFSET;
+ startupMode = KafkaScanStartupMode.EARLIEST_OFFSET;
break;
case LATEST:
default:
- startupMode = ScanStartupMode.LATEST_OFFSET;
+ startupMode = KafkaScanStartupMode.LATEST_OFFSET;
}
final String primaryKey = kafkaSourceResponse.getPrimaryKey();
String groupId = kafkaSourceResponse.getGroupId();
@@ -236,7 +238,11 @@ public class ExtractNodeUtils {
default:
throw new IllegalArgumentException(String.format("Unsupported dataType=%s for kafka source", dataType));
}
- ScanStartupMode startupMode = ScanStartupMode.forName(pulsarSourceResponse.getScanStartupMode());
+ if (pulsarSourceResponse.isInlongComponent()) {
+ Format innerFormat = format;
+ format = new InLongMsgFormat(innerFormat, false);
+ }
+ PulsarScanStartupMode startupMode = PulsarScanStartupMode.forName(pulsarSourceResponse.getScanStartupMode());
final String primaryKey = pulsarSourceResponse.getPrimaryKey();
final String serviceUrl = pulsarSourceResponse.getServiceUrl();
final String adminUrl = pulsarSourceResponse.getAdminUrl();
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/enums/ScanStartupMode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/enums/KafkaScanStartupMode.java
similarity index 79%
rename from inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/enums/ScanStartupMode.java
rename to inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/enums/KafkaScanStartupMode.java
index cb9cb943c..bd21aa1fe 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/enums/ScanStartupMode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/enums/KafkaScanStartupMode.java
@@ -23,13 +23,11 @@ import java.util.Locale;
/**
* kafka consumer scan startup mode enum
*/
-public enum ScanStartupMode {
+public enum KafkaScanStartupMode {
EARLIEST_OFFSET("earliest-offset"),
- LATEST_OFFSET("latest-offset"),
- EXTERNAL_SUBSCRIPTION("external-subscription"),
- SPECIFIC_OFFSETS("specific-offsets");
+ LATEST_OFFSET("latest-offset");
- ScanStartupMode(String value) {
+ KafkaScanStartupMode(String value) {
this.value = value;
}
@@ -39,13 +37,13 @@ public enum ScanStartupMode {
return value;
}
- public static ScanStartupMode forName(String name) {
- for (ScanStartupMode startupMode : values()) {
+ public static KafkaScanStartupMode forName(String name) {
+ for (KafkaScanStartupMode startupMode : values()) {
if (startupMode.getValue().equals(name.toLowerCase(Locale.ROOT))) {
return startupMode;
}
}
- throw new IllegalArgumentException(String.format("Unsupported ScanStartupMode=%s for Inlong", name));
+ throw new IllegalArgumentException(String.format("Unsupported KafkaScanStartupMode=%s for Inlong", name));
}
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/DataFormat.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/enums/PulsarScanStartupMode.java
similarity index 57%
copy from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/DataFormat.java
copy to inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/enums/PulsarScanStartupMode.java
index 6ae1cf133..0ba92d6b9 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/DataFormat.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/enums/PulsarScanStartupMode.java
@@ -15,39 +15,34 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.common.enums;
-
-import lombok.Getter;
+package org.apache.inlong.sort.protocol.enums;
import java.util.Locale;
-/**
- * Enum of data format.
- */
-public enum DataFormat {
+public enum PulsarScanStartupMode {
- CSV("csv"),
- AVRO("avro"),
- CANAL("canal"),
- JSON("json"),
- NONE("none");
+ EARLIEST("earliest"),
+ LATEST("latest"),
+ EXTERNAL_SUBSCRIPTION("external-subscription"),
+ SPECIFIC_OFFSETS("specific-offsets");
- @Getter
- private final String name;
+ PulsarScanStartupMode(String value) {
+ this.value = value;
+ }
- DataFormat(String name) {
- this.name = name;
+ private final String value;
+
+ public String getValue() {
+ return value;
}
- /**
- * Get dataformat for inlong by name.
- */
- public static DataFormat forName(String name) {
- for (DataFormat dataFormat : values()) {
- if (dataFormat.getName().equals(name.toLowerCase(Locale.ROOT))) {
- return dataFormat;
+ public static PulsarScanStartupMode forName(String name) {
+ for (PulsarScanStartupMode startupMode : values()) {
+ if (startupMode.getValue().equals(name.toLowerCase(Locale.ROOT))) {
+ return startupMode;
}
}
- throw new IllegalArgumentException(String.format("Unsupported DataFormat=%s for Inlong", name));
+ throw new IllegalArgumentException(String.format("Unsupported PulsarScanStartupMode=%s for Inlong", name));
}
+
}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
index 4465a5e71..0677cd69c 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
@@ -26,7 +26,7 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonPro
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.constant.KafkaConstant;
-import org.apache.inlong.sort.protocol.enums.ScanStartupMode;
+import org.apache.inlong.sort.protocol.enums.KafkaScanStartupMode;
import org.apache.inlong.sort.protocol.node.ExtractNode;
import org.apache.inlong.sort.protocol.node.format.AvroFormat;
import org.apache.inlong.sort.protocol.node.format.CanalJsonFormat;
@@ -63,7 +63,7 @@ public class KafkaExtractNode extends ExtractNode implements Serializable {
private Format format;
@JsonProperty("scanStartupMode")
- private ScanStartupMode scanStartupMode;
+ private KafkaScanStartupMode kafkaScanStartupMode;
@JsonProperty("primaryKey")
private String primaryKey;
@@ -80,14 +80,14 @@ public class KafkaExtractNode extends ExtractNode implements Serializable {
@Nonnull @JsonProperty("topic") String topic,
@Nonnull @JsonProperty("bootstrapServers") String bootstrapServers,
@Nonnull @JsonProperty("format") Format format,
- @JsonProperty("scanStartupMode") ScanStartupMode scanStartupMode,
+ @JsonProperty("scanStartupMode") KafkaScanStartupMode kafkaScanStartupMode,
@JsonProperty("primaryKey") String primaryKey,
@JsonProperty("groupId") String groupId) {
super(id, name, fields, watermarkField, properties);
this.topic = Preconditions.checkNotNull(topic, "kafka topic is empty");
this.bootstrapServers = Preconditions.checkNotNull(bootstrapServers, "kafka bootstrapServers is empty");
this.format = Preconditions.checkNotNull(format, "kafka format is empty");
- this.scanStartupMode = Preconditions.checkNotNull(scanStartupMode, "kafka scanStartupMode is empty");
+ this.kafkaScanStartupMode = Preconditions.checkNotNull(kafkaScanStartupMode, "kafka scanStartupMode is empty");
this.primaryKey = primaryKey;
this.groupId = groupId;
}
@@ -105,7 +105,7 @@ public class KafkaExtractNode extends ExtractNode implements Serializable {
if (format instanceof JsonFormat || format instanceof AvroFormat || format instanceof CsvFormat) {
if (StringUtils.isEmpty(this.primaryKey)) {
options.put(KafkaConstant.CONNECTOR, KafkaConstant.KAFKA);
- options.put(KafkaConstant.SCAN_STARTUP_MODE, scanStartupMode.getValue());
+ options.put(KafkaConstant.SCAN_STARTUP_MODE, kafkaScanStartupMode.getValue());
options.putAll(format.generateOptions(false));
} else {
options.put(KafkaConstant.CONNECTOR, KafkaConstant.UPSERT_KAFKA);
@@ -113,7 +113,7 @@ public class KafkaExtractNode extends ExtractNode implements Serializable {
}
} else if (format instanceof CanalJsonFormat || format instanceof DebeziumJsonFormat) {
options.put(KafkaConstant.CONNECTOR, KafkaConstant.KAFKA);
- options.put(KafkaConstant.SCAN_STARTUP_MODE, scanStartupMode.getValue());
+ options.put(KafkaConstant.SCAN_STARTUP_MODE, kafkaScanStartupMode.getValue());
options.putAll(format.generateOptions(false));
} else {
throw new IllegalArgumentException("kafka extract node format is IllegalArgument");
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNodeTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNodeTest.java
index e0a639fd8..4315ef4ae 100644
--- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNodeTest.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNodeTest.java
@@ -21,7 +21,7 @@ import org.apache.inlong.sort.SerializeBaseTest;
import org.apache.inlong.sort.formats.common.IntFormatInfo;
import org.apache.inlong.sort.formats.common.StringFormatInfo;
import org.apache.inlong.sort.protocol.FieldInfo;
-import org.apache.inlong.sort.protocol.enums.ScanStartupMode;
+import org.apache.inlong.sort.protocol.enums.KafkaScanStartupMode;
import org.apache.inlong.sort.protocol.node.Node;
import org.apache.inlong.sort.protocol.node.format.CsvFormat;
@@ -39,6 +39,6 @@ public class KafkaExtractNodeTest extends SerializeBaseTest<Node> {
new FieldInfo("name", new StringFormatInfo()),
new FieldInfo("age", new IntFormatInfo()));
return new KafkaExtractNode("1", "kafka_input", fields, null, null, "workerCsv",
- "localhost:9092", new CsvFormat(), ScanStartupMode.EARLIEST_OFFSET, null, "groupId");
+ "localhost:9092", new CsvFormat(), KafkaScanStartupMode.EARLIEST_OFFSET, null, "groupId");
}
}
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DistinctNodeSqlParseTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DistinctNodeSqlParseTest.java
index 94085f7c8..5f1192922 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DistinctNodeSqlParseTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DistinctNodeSqlParseTest.java
@@ -33,7 +33,7 @@ import org.apache.inlong.sort.protocol.BuiltInFieldInfo.BuiltInField;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.GroupInfo;
import org.apache.inlong.sort.protocol.StreamInfo;
-import org.apache.inlong.sort.protocol.enums.ScanStartupMode;
+import org.apache.inlong.sort.protocol.enums.KafkaScanStartupMode;
import org.apache.inlong.sort.protocol.node.Node;
import org.apache.inlong.sort.protocol.node.extract.KafkaExtractNode;
import org.apache.inlong.sort.protocol.node.format.JsonFormat;
@@ -68,7 +68,7 @@ public class DistinctNodeSqlParseTest extends AbstractTestBase {
new BuiltInFieldInfo("proctime", new TimestampFormatInfo(), BuiltInField.PROCESS_TIME));
return new KafkaExtractNode("1", "kafka_input", fields, null,
null, "topic_input", "localhost:9092",
- new JsonFormat(), ScanStartupMode.EARLIEST_OFFSET,
+ new JsonFormat(), KafkaScanStartupMode.EARLIEST_OFFSET,
null, "groupId");
}
@@ -84,7 +84,7 @@ public class DistinctNodeSqlParseTest extends AbstractTestBase {
new TimeUnitConstantParam(TimeUnit.SECOND));
return new KafkaExtractNode("1", "kafka_input", fields, wk,
null, "topic_input", "localhost:9092",
- new JsonFormat(), ScanStartupMode.EARLIEST_OFFSET,
+ new JsonFormat(), KafkaScanStartupMode.EARLIEST_OFFSET,
null, "groupId");
}
@@ -96,7 +96,7 @@ public class DistinctNodeSqlParseTest extends AbstractTestBase {
new FieldInfo("ts", new TimestampFormatInfo()));
return new KafkaExtractNode("1", "kafka_input", fields, null,
null, "topic_input", "localhost:9092",
- new JsonFormat(), ScanStartupMode.EARLIEST_OFFSET, null, "groupId");
+ new JsonFormat(), KafkaScanStartupMode.EARLIEST_OFFSET, null, "groupId");
}
private KafkaLoadNode buildKafkaLoadNode() {
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FlinkSqlParserTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FlinkSqlParserTest.java
index 14ec958d9..b81a54c92 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FlinkSqlParserTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FlinkSqlParserTest.java
@@ -31,7 +31,7 @@ import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.GroupInfo;
import org.apache.inlong.sort.protocol.StreamInfo;
-import org.apache.inlong.sort.protocol.enums.ScanStartupMode;
+import org.apache.inlong.sort.protocol.enums.KafkaScanStartupMode;
import org.apache.inlong.sort.protocol.node.Node;
import org.apache.inlong.sort.protocol.node.extract.KafkaExtractNode;
import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
@@ -87,7 +87,7 @@ public class FlinkSqlParserTest extends AbstractTestBase {
new StringConstantParam("5"),
new TimeUnitConstantParam(TimeUnit.SECOND));
return new KafkaExtractNode(id, "kafka_input", fields, wk, null, "workerJson",
- "localhost:9092", new JsonFormat(), ScanStartupMode.EARLIEST_OFFSET, null, "groupId");
+ "localhost:9092", new JsonFormat(), KafkaScanStartupMode.EARLIEST_OFFSET, null, "groupId");
}
private KafkaLoadNode buildKafkaNode(String id) {
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FullOuterJoinSqlParseTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FullOuterJoinSqlParseTest.java
index d01075ada..cd629ee6c 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FullOuterJoinSqlParseTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FullOuterJoinSqlParseTest.java
@@ -31,7 +31,7 @@ import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.GroupInfo;
import org.apache.inlong.sort.protocol.StreamInfo;
-import org.apache.inlong.sort.protocol.enums.ScanStartupMode;
+import org.apache.inlong.sort.protocol.enums.KafkaScanStartupMode;
import org.apache.inlong.sort.protocol.node.Node;
import org.apache.inlong.sort.protocol.node.extract.KafkaExtractNode;
import org.apache.inlong.sort.protocol.node.format.JsonFormat;
@@ -75,7 +75,7 @@ public class FullOuterJoinSqlParseTest extends AbstractTestBase {
new FieldInfo("name", new StringFormatInfo()));
return new KafkaExtractNode("1", "kafka_input_1", fields, null,
null, "topic_input_1", "localhost:9092",
- new JsonFormat(), ScanStartupMode.EARLIEST_OFFSET,
+ new JsonFormat(), KafkaScanStartupMode.EARLIEST_OFFSET,
null, "groupId");
}
@@ -89,7 +89,7 @@ public class FullOuterJoinSqlParseTest extends AbstractTestBase {
new FieldInfo("age", new IntFormatInfo()));
return new KafkaExtractNode("2", "kafka_input_2", fields, null,
null, "topic_input_2", "localhost:9092",
- new JsonFormat(), ScanStartupMode.EARLIEST_OFFSET,
+ new JsonFormat(), KafkaScanStartupMode.EARLIEST_OFFSET,
null, "groupId");
}
@@ -104,7 +104,7 @@ public class FullOuterJoinSqlParseTest extends AbstractTestBase {
new FieldInfo("ts", new TimestampFormatInfo()));
return new KafkaExtractNode("3", "kafka_input_3", fields, null,
null, "topic_input_3", "localhost:9092",
- new JsonFormat(), ScanStartupMode.EARLIEST_OFFSET, null, "groupId");
+ new JsonFormat(), KafkaScanStartupMode.EARLIEST_OFFSET, null, "groupId");
}
/**
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/InnerJoinRelationShipSqlParseTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/InnerJoinRelationShipSqlParseTest.java
index 05eeea8dd..7fae01782 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/InnerJoinRelationShipSqlParseTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/InnerJoinRelationShipSqlParseTest.java
@@ -31,7 +31,7 @@ import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.GroupInfo;
import org.apache.inlong.sort.protocol.StreamInfo;
-import org.apache.inlong.sort.protocol.enums.ScanStartupMode;
+import org.apache.inlong.sort.protocol.enums.KafkaScanStartupMode;
import org.apache.inlong.sort.protocol.node.Node;
import org.apache.inlong.sort.protocol.node.extract.KafkaExtractNode;
import org.apache.inlong.sort.protocol.node.format.JsonFormat;
@@ -75,7 +75,7 @@ public class InnerJoinRelationShipSqlParseTest extends AbstractTestBase {
new FieldInfo("name", new StringFormatInfo()));
return new KafkaExtractNode("1", "kafka_input_1", fields, null,
null, "topic_input_1", "localhost:9092",
- new JsonFormat(), ScanStartupMode.EARLIEST_OFFSET,
+ new JsonFormat(), KafkaScanStartupMode.EARLIEST_OFFSET,
null, "groupId");
}
@@ -89,7 +89,7 @@ public class InnerJoinRelationShipSqlParseTest extends AbstractTestBase {
new FieldInfo("age", new IntFormatInfo()));
return new KafkaExtractNode("2", "kafka_input_2", fields, null,
null, "topic_input_2", "localhost:9092",
- new JsonFormat(), ScanStartupMode.EARLIEST_OFFSET,
+ new JsonFormat(), KafkaScanStartupMode.EARLIEST_OFFSET,
null, "groupId");
}
@@ -104,7 +104,7 @@ public class InnerJoinRelationShipSqlParseTest extends AbstractTestBase {
new FieldInfo("ts", new TimestampFormatInfo()));
return new KafkaExtractNode("3", "kafka_input_3", fields, null,
null, "topic_input_3", "localhost:9092",
- new JsonFormat(), ScanStartupMode.EARLIEST_OFFSET, null, "groupId");
+ new JsonFormat(), KafkaScanStartupMode.EARLIEST_OFFSET, null, "groupId");
}
/**
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/LeftOuterJoinSqlParseTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/LeftOuterJoinSqlParseTest.java
index ba61be859..5758372fc 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/LeftOuterJoinSqlParseTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/LeftOuterJoinSqlParseTest.java
@@ -32,7 +32,7 @@ import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.GroupInfo;
import org.apache.inlong.sort.protocol.StreamInfo;
import org.apache.inlong.sort.protocol.enums.FilterStrategy;
-import org.apache.inlong.sort.protocol.enums.ScanStartupMode;
+import org.apache.inlong.sort.protocol.enums.KafkaScanStartupMode;
import org.apache.inlong.sort.protocol.node.Node;
import org.apache.inlong.sort.protocol.node.extract.KafkaExtractNode;
import org.apache.inlong.sort.protocol.node.format.JsonFormat;
@@ -76,7 +76,7 @@ public class LeftOuterJoinSqlParseTest extends AbstractTestBase {
new FieldInfo("name", new StringFormatInfo()));
return new KafkaExtractNode("1", "kafka_input_1", fields, null,
null, "topic_input_1", "localhost:9092",
- new JsonFormat(), ScanStartupMode.EARLIEST_OFFSET,
+ new JsonFormat(), KafkaScanStartupMode.EARLIEST_OFFSET,
null, "groupId");
}
@@ -90,7 +90,7 @@ public class LeftOuterJoinSqlParseTest extends AbstractTestBase {
new FieldInfo("age", new IntFormatInfo()));
return new KafkaExtractNode("2", "kafka_input_2", fields, null,
null, "topic_input_2", "localhost:9092",
- new JsonFormat(), ScanStartupMode.EARLIEST_OFFSET,
+ new JsonFormat(), KafkaScanStartupMode.EARLIEST_OFFSET,
null, "groupId");
}
@@ -105,7 +105,7 @@ public class LeftOuterJoinSqlParseTest extends AbstractTestBase {
new FieldInfo("ts", new TimestampFormatInfo()));
return new KafkaExtractNode("3", "kafka_input_3", fields, null,
null, "topic_input_3", "localhost:9092",
- new JsonFormat(), ScanStartupMode.EARLIEST_OFFSET, null, "groupId");
+ new JsonFormat(), KafkaScanStartupMode.EARLIEST_OFFSET, null, "groupId");
}
/**
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MetaFieldSyncTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MetaFieldSyncTest.java
index ebf5f1f50..252ca4f0e 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MetaFieldSyncTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MetaFieldSyncTest.java
@@ -36,7 +36,7 @@ import org.apache.inlong.sort.protocol.BuiltInFieldInfo.BuiltInField;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.GroupInfo;
import org.apache.inlong.sort.protocol.StreamInfo;
-import org.apache.inlong.sort.protocol.enums.ScanStartupMode;
+import org.apache.inlong.sort.protocol.enums.KafkaScanStartupMode;
import org.apache.inlong.sort.protocol.node.Node;
import org.apache.inlong.sort.protocol.node.extract.KafkaExtractNode;
import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
@@ -187,7 +187,7 @@ public class MetaFieldSyncTest extends AbstractTestBase {
);
return new KafkaExtractNode("3", "kafka_input", fields,
null, null, "topic1", "localhost:9092",
- new CanalJsonFormat(), ScanStartupMode.EARLIEST_OFFSET,
+ new CanalJsonFormat(), KafkaScanStartupMode.EARLIEST_OFFSET,
null, "groupId");
}
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/RightOuterJoinSqlParseTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/RightOuterJoinSqlParseTest.java
index f5462136d..51af295fd 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/RightOuterJoinSqlParseTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/RightOuterJoinSqlParseTest.java
@@ -31,7 +31,7 @@ import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.GroupInfo;
import org.apache.inlong.sort.protocol.StreamInfo;
-import org.apache.inlong.sort.protocol.enums.ScanStartupMode;
+import org.apache.inlong.sort.protocol.enums.KafkaScanStartupMode;
import org.apache.inlong.sort.protocol.node.Node;
import org.apache.inlong.sort.protocol.node.extract.KafkaExtractNode;
import org.apache.inlong.sort.protocol.node.format.JsonFormat;
@@ -75,7 +75,7 @@ public class RightOuterJoinSqlParseTest extends AbstractTestBase {
new FieldInfo("name", new StringFormatInfo()));
return new KafkaExtractNode("1", "kafka_input_1", fields, null,
null, "topic_input_1", "localhost:9092",
- new JsonFormat(), ScanStartupMode.EARLIEST_OFFSET,
+ new JsonFormat(), KafkaScanStartupMode.EARLIEST_OFFSET,
null, "groupId");
}
@@ -89,7 +89,7 @@ public class RightOuterJoinSqlParseTest extends AbstractTestBase {
new FieldInfo("age", new IntFormatInfo()));
return new KafkaExtractNode("2", "kafka_input_2", fields, null,
null, "topic_input_2", "localhost:9092",
- new JsonFormat(), ScanStartupMode.EARLIEST_OFFSET,
+ new JsonFormat(), KafkaScanStartupMode.EARLIEST_OFFSET,
null, "groupId");
}
@@ -104,7 +104,7 @@ public class RightOuterJoinSqlParseTest extends AbstractTestBase {
new FieldInfo("ts", new TimestampFormatInfo()));
return new KafkaExtractNode("3", "kafka_input_3", fields, null,
null, "topic_input_3", "localhost:9092",
- new JsonFormat(), ScanStartupMode.EARLIEST_OFFSET, null, "groupId");
+ new JsonFormat(), KafkaScanStartupMode.EARLIEST_OFFSET, null, "groupId");
}
/**