You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/05/21 04:19:39 UTC
[incubator-inlong] branch master updated: [INLONG-4278][Manager] Support Pulsar extract node in Manager (#4287)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 022bc6c6c [INLONG-4278][Manager] Support Pulsar extract node in Manager (#4287)
022bc6c6c is described below
commit 022bc6c6c29436bae674086c94742a6d17984656
Author: kipshi <48...@users.noreply.github.com>
AuthorDate: Sat May 21 12:19:34 2022 +0800
[INLONG-4278][Manager] Support Pulsar extract node in Manager (#4287)
---
.../apache/inlong/common/enums/TaskTypeEnum.java | 4 +-
.../inlong/manager/common/enums/SourceType.java | 4 +-
.../autopush/AutoPushSourceListResponse.java | 5 +
.../source/binlog/BinlogSourceListResponse.java | 5 +
.../pojo/source/kafka/KafkaSourceListResponse.java | 5 +
.../pojo/source/kafka/KafkaSourceRequest.java | 2 +-
.../common/pojo/source/pulsar/PulsarSourceDTO.java | 89 ++++++++++
.../PulsarSourceListResponse.java} | 44 ++---
.../PulsarSourceRequest.java} | 50 +++---
.../PulsarSourceResponse.java} | 49 +++---
.../service/sort/CreateSortConfigListener.java | 1 +
.../service/sort/CreateSortConfigListenerV2.java | 187 +++++++++++++++++++++
.../service/sort/PushSortConfigListener.java | 1 +
.../service/sort/light/LightGroupSortListener.java | 9 +-
.../manager/service/sort/util/DataFlowUtils.java | 1 +
.../service/sort/util/ExtractNodeUtils.java | 58 +++++++
.../service/sort/util/NodeRelationShipUtils.java | 1 +
.../source/pulsar/PulsarSourceOperation.java | 101 +++++++++++
.../listener/GroupTaskListenerFactory.java | 4 +-
.../sort/protocol/enums/ScanStartupMode.java | 15 +-
20 files changed, 557 insertions(+), 78 deletions(-)
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java b/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java
index b393167fc..6d12377d8 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java
@@ -20,7 +20,7 @@ package org.apache.inlong.common.enums;
import static java.util.Objects.requireNonNull;
public enum TaskTypeEnum {
- DATABASE_MIGRATION(0),SQL(1), BINLOG(2), FILE(3), KAFKA(4);
+ DATABASE_MIGRATION(0),SQL(1), BINLOG(2), FILE(3), KAFKA(4), PULSAR(5);
private int type;
@@ -41,6 +41,8 @@ public enum TaskTypeEnum {
return FILE;
case 4:
return KAFKA;
+ case 5:
+ return PULSAR;
default:
throw new RuntimeException("such task type doesn't exist");
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java
index 2ea9726fa..524de69e8 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java
@@ -31,13 +31,15 @@ public enum SourceType {
FILE("FILE", TaskTypeEnum.FILE),
SQL("SQL", TaskTypeEnum.SQL),
BINLOG("BINLOG", TaskTypeEnum.BINLOG),
- KAFKA("KAFKA", TaskTypeEnum.KAFKA);
+ KAFKA("KAFKA", TaskTypeEnum.KAFKA),
+ PULSAR("PULSAR", TaskTypeEnum.PULSAR);
public static final String SOURCE_AUTO_PUSH = "AUTO_PUSH";
public static final String SOURCE_FILE = "FILE";
public static final String SOURCE_SQL = "SQL";
public static final String SOURCE_BINLOG = "BINLOG";
public static final String SOURCE_KAFKA = "KAFKA";
+ public static final String SOURCE_PULSAR = "PULSAR";
@Getter
private final String type;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/autopush/AutoPushSourceListResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/autopush/AutoPushSourceListResponse.java
index 68ddeeecd..82d925980 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/autopush/AutoPushSourceListResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/autopush/AutoPushSourceListResponse.java
@@ -21,6 +21,7 @@ import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.EqualsAndHashCode;
+import org.apache.inlong.manager.common.enums.SourceType;
import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
/**
@@ -33,4 +34,8 @@ public class AutoPushSourceListResponse extends SourceListResponse {
@ApiModelProperty(value = "DataProxy group name, used when the user enables local configuration")
private String dataProxyGroup;
+
+ public AutoPushSourceListResponse() {
+ this.setSourceType(SourceType.AUTO_PUSH.getType());
+ }
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceListResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceListResponse.java
index 8ae188fc9..702735077 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceListResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceListResponse.java
@@ -21,6 +21,7 @@ import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.EqualsAndHashCode;
+import org.apache.inlong.manager.common.enums.SourceType;
import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
/**
@@ -81,4 +82,8 @@ public class BinlogSourceListResponse extends SourceListResponse {
@ApiModelProperty(value = "Primary key must be shared by all tables", required = false)
private String primaryKey;
+
+ public BinlogSourceListResponse() {
+ this.setSourceType(SourceType.BINLOG.getType());
+ }
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceListResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceListResponse.java
index d1a7b445e..d56c3a0a2 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceListResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceListResponse.java
@@ -21,6 +21,7 @@ import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.EqualsAndHashCode;
+import org.apache.inlong.manager.common.enums.SourceType;
import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
/**
@@ -57,4 +58,8 @@ public class KafkaSourceListResponse extends SourceListResponse {
@ApiModelProperty("Primary key, needed when serialization type is csv, json, avro")
private String primaryKey;
+ public KafkaSourceListResponse() {
+ this.setSourceType(SourceType.KAFKA.getType());
+ }
+
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceRequest.java
index 04ae6ac07..afac4eceb 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceRequest.java
@@ -27,7 +27,7 @@ import org.apache.inlong.manager.common.pojo.source.SourceRequest;
import org.apache.inlong.manager.common.util.JsonTypeDefine;
/**
- * Request of the kafka source info
+ * Request of kafka source info
*/
@Data
@ToString(callSuper = true)
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/pulsar/PulsarSourceDTO.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/pulsar/PulsarSourceDTO.java
new file mode 100644
index 000000000..28f6bf6f8
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/pulsar/PulsarSourceDTO.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.source.pulsar;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+
+import javax.validation.constraints.NotNull;
+
+/**
+ * Pulsar source information data transfer object
+ */
+@Data
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
+public class PulsarSourceDTO {
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ @ApiModelProperty("Pulsar tenant")
+ private String tenant;
+
+ @ApiModelProperty("Pulsar namespace")
+ private String namespace;
+
+ @ApiModelProperty("Pulsar topic")
+ private String topic;
+
+ @ApiModelProperty("Pulsar adminUrl")
+ private String adminUrl;
+
+ @ApiModelProperty("Pulsar serviceUrl")
+ private String serviceUrl;
+
+ @ApiModelProperty("Primary key, needed when serialization type is csv, json, avro")
+ private String primaryKey;
+
+ @ApiModelProperty("Configure the Source's startup mode. "
+ + "Available options are earliest, latest, external-subscription, and specific-offsets.")
+ private String scanStartupMode = "earliest";
+
+ /**
+ * Get the dto instance from the request
+ */
+ public static PulsarSourceDTO getFromRequest(PulsarSourceRequest request) {
+ return PulsarSourceDTO.builder()
+ .adminUrl(request.getAdminUrl())
+ .serviceUrl(request.getServiceUrl())
+ .tenant(request.getTenant())
+ .namespace(request.getNamespace())
+ .topic(request.getTopic())
+ .primaryKey(request.getPrimaryKey())
+ .scanStartupMode(request.getScanStartupMode())
+ .build();
+ }
+
+ public static PulsarSourceDTO getFromJson(@NotNull String extParams) {
+ try {
+ OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+ return OBJECT_MAPPER.readValue(extParams, PulsarSourceDTO.class);
+ } catch (Exception e) {
+ throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+ }
+ }
+
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceListResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/pulsar/PulsarSourceListResponse.java
similarity index 54%
copy from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceListResponse.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/pulsar/PulsarSourceListResponse.java
index d1a7b445e..e9686e1c7 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceListResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/pulsar/PulsarSourceListResponse.java
@@ -15,46 +15,46 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.common.pojo.source.kafka;
+package org.apache.inlong.manager.common.pojo.source.pulsar;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.EqualsAndHashCode;
+import org.apache.inlong.manager.common.enums.SourceType;
import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
/**
- * Response of kafka source list
+ * Response of pulsar source list
*/
@Data
@EqualsAndHashCode(callSuper = true)
-@ApiModel("Response of kafka source paging list")
-public class KafkaSourceListResponse extends SourceListResponse {
+@ApiModel("Response of pulsar source paging list")
+public class PulsarSourceListResponse extends SourceListResponse {
- @ApiModelProperty("Kafka topic")
- private String topic;
-
- @ApiModelProperty("Kafka consumer group")
- private String groupId;
-
- @ApiModelProperty("Kafka servers address")
- private String bootstrapServers;
+ @ApiModelProperty("Pulsar tenant")
+ private String tenant = "default";
- @ApiModelProperty("Limit the amount of data read per second")
- private String recordSpeedLimit;
+ @ApiModelProperty("Pulsar namespace")
+ private String namespace;
- @ApiModelProperty("Limit the number of bytes read per second")
- private String byteSpeedLimit;
+ @ApiModelProperty("Pulsar topic")
+ private String topic;
- @ApiModelProperty(value = "Topic partition offset",
- notes = "For example, '0#100_1#10' means the offset of partition 0 is 100, the offset of partition 1 is 10")
- private String topicPartitionOffset;
+ @ApiModelProperty("Pulsar adminUrl")
+ private String adminUrl;
- @ApiModelProperty(value = "The strategy of auto offset reset",
- notes = "including earliest, latest (the default), none")
- private String autoOffsetReset;
+ @ApiModelProperty("Pulsar serviceUrl")
+ private String serviceUrl;
@ApiModelProperty("Primary key, needed when serialization type is csv, json, avro")
private String primaryKey;
+ @ApiModelProperty("Configure the Source's startup mode."
+ + " Available options are earliest, latest, external-subscription, and specific-offsets.")
+ private String scanStartupMode = "earliest";
+
+ public PulsarSourceListResponse() {
+ this.setSourceType(SourceType.PULSAR.getType());
+ }
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceListResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/pulsar/PulsarSourceRequest.java
similarity index 50%
copy from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceListResponse.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/pulsar/PulsarSourceRequest.java
index d1a7b445e..6c58496fb 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceListResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/pulsar/PulsarSourceRequest.java
@@ -15,46 +15,50 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.common.pojo.source.kafka;
+package org.apache.inlong.manager.common.pojo.source.pulsar;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.EqualsAndHashCode;
-import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
+import lombok.ToString;
+import org.apache.inlong.manager.common.enums.SourceType;
+import org.apache.inlong.manager.common.pojo.source.SourceRequest;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
/**
- * Response of kafka source list
+ * Request of pulsar source info
*/
@Data
+@ToString(callSuper = true)
@EqualsAndHashCode(callSuper = true)
-@ApiModel("Response of kafka source paging list")
-public class KafkaSourceListResponse extends SourceListResponse {
+@ApiModel(value = "Request of the kafka source info")
+@JsonTypeDefine(value = SourceType.SOURCE_PULSAR)
+public class PulsarSourceRequest extends SourceRequest {
- @ApiModelProperty("Kafka topic")
- private String topic;
-
- @ApiModelProperty("Kafka consumer group")
- private String groupId;
-
- @ApiModelProperty("Kafka servers address")
- private String bootstrapServers;
+ @ApiModelProperty("Pulsar tenant")
+ private String tenant = "default";
- @ApiModelProperty("Limit the amount of data read per second")
- private String recordSpeedLimit;
+ @ApiModelProperty("Pulsar namespace")
+ private String namespace;
- @ApiModelProperty("Limit the number of bytes read per second")
- private String byteSpeedLimit;
+ @ApiModelProperty("Pulsar topic")
+ private String topic;
- @ApiModelProperty(value = "Topic partition offset",
- notes = "For example, '0#100_1#10' means the offset of partition 0 is 100, the offset of partition 1 is 10")
- private String topicPartitionOffset;
+ @ApiModelProperty("Pulsar adminUrl")
+ private String adminUrl;
- @ApiModelProperty(value = "The strategy of auto offset reset",
- notes = "including earliest, latest (the default), none")
- private String autoOffsetReset;
+ @ApiModelProperty("Pulsar serviceUrl")
+ private String serviceUrl;
@ApiModelProperty("Primary key, needed when serialization type is csv, json, avro")
private String primaryKey;
+ @ApiModelProperty("Configure the Source's startup mode."
+ + " Available options are earliest, latest, external-subscription, and specific-offsets.")
+ private String scanStartupMode = "earliest";
+
+ public PulsarSourceRequest() {
+ this.setSourceType(SourceType.PULSAR.toString());
+ }
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceListResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/pulsar/PulsarSourceResponse.java
similarity index 51%
copy from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceListResponse.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/pulsar/PulsarSourceResponse.java
index d1a7b445e..d98d4dd59 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceListResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/pulsar/PulsarSourceResponse.java
@@ -15,46 +15,49 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.common.pojo.source.kafka;
+package org.apache.inlong.manager.common.pojo.source.pulsar;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.EqualsAndHashCode;
-import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
+import lombok.ToString;
+import org.apache.inlong.manager.common.enums.SourceType;
+import org.apache.inlong.manager.common.pojo.source.SourceResponse;
/**
- * Response of kafka source list
+ * Response of pulsar source
*/
@Data
+@ToString(callSuper = true)
@EqualsAndHashCode(callSuper = true)
-@ApiModel("Response of kafka source paging list")
-public class KafkaSourceListResponse extends SourceListResponse {
+@ApiModel(value = "Response of pulsar source")
+public class PulsarSourceResponse extends SourceResponse {
- @ApiModelProperty("Kafka topic")
- private String topic;
-
- @ApiModelProperty("Kafka consumer group")
- private String groupId;
-
- @ApiModelProperty("Kafka servers address")
- private String bootstrapServers;
+ @ApiModelProperty("Pulsar tenant")
+ private String tenant = "default";
- @ApiModelProperty("Limit the amount of data read per second")
- private String recordSpeedLimit;
+ @ApiModelProperty("Pulsar namespace")
+ private String namespace;
- @ApiModelProperty("Limit the number of bytes read per second")
- private String byteSpeedLimit;
+ @ApiModelProperty("Pulsar topic")
+ private String topic;
- @ApiModelProperty(value = "Topic partition offset",
- notes = "For example, '0#100_1#10' means the offset of partition 0 is 100, the offset of partition 1 is 10")
- private String topicPartitionOffset;
+ @ApiModelProperty("Pulsar adminUrl")
+ private String adminUrl;
- @ApiModelProperty(value = "The strategy of auto offset reset",
- notes = "including earliest, latest (the default), none")
- private String autoOffsetReset;
+ @ApiModelProperty("Pulsar serviceUrl")
+ private String serviceUrl;
@ApiModelProperty("Primary key, needed when serialization type is csv, json, avro")
private String primaryKey;
+ @ApiModelProperty("Configure the Source's startup mode. "
+ + "Available options are earliest, latest, external-subscription, and specific-offsets.")
+ private String scanStartupMode = "earliest";
+
+ public PulsarSourceResponse() {
+ this.setSourceType(SourceType.PULSAR.getType());
+ }
+
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListener.java
index 510f12662..59b117517 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListener.java
@@ -48,6 +48,7 @@ import java.util.stream.Collectors;
/**
* Create sort config when disable the ZooKeeper
*/
+@Deprecated
@Component
public class CreateSortConfigListener implements SortOperateListener {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListenerV2.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListenerV2.java
new file mode 100644
index 000000000..842ea887d
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListenerV2.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.sort;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.inlong.common.pojo.dataproxy.PulsarClusterInfo;
+import org.apache.inlong.manager.common.enums.GroupOperateType;
+import org.apache.inlong.manager.common.enums.MQType;
+import org.apache.inlong.manager.common.enums.SourceType;
+import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
+import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
+import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
+import org.apache.inlong.manager.common.pojo.source.SourceResponse;
+import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceResponse;
+import org.apache.inlong.manager.common.pojo.source.pulsar.PulsarSourceResponse;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
+import org.apache.inlong.manager.common.settings.InlongGroupSettings;
+import org.apache.inlong.manager.service.CommonOperateService;
+import org.apache.inlong.manager.service.sink.StreamSinkService;
+import org.apache.inlong.manager.service.sort.util.ExtractNodeUtils;
+import org.apache.inlong.manager.service.sort.util.LoadNodeUtils;
+import org.apache.inlong.manager.service.source.StreamSourceService;
+import org.apache.inlong.manager.workflow.WorkflowContext;
+import org.apache.inlong.manager.workflow.event.ListenerResult;
+import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
+import org.apache.inlong.manager.workflow.event.task.TaskEvent;
+import org.apache.inlong.sort.protocol.GroupInfo;
+import org.apache.inlong.sort.protocol.StreamInfo;
+import org.apache.inlong.sort.protocol.node.Node;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelationShip;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+@Component
+@Slf4j
+public class CreateSortConfigListenerV2 implements SortOperateListener {
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ @Autowired
+ private StreamSourceService sourceService;
+
+ @Autowired
+ private StreamSinkService sinkService;
+
+ @Autowired
+ private CommonOperateService commonOperateService;
+
+ @Override
+ public TaskEvent event() {
+ return TaskEvent.COMPLETE;
+ }
+
+ @Override
+ public ListenerResult listen(WorkflowContext context) throws Exception {
+ log.info("Create sort config V2 for groupId={}", context.getProcessForm().getInlongGroupId());
+ GroupResourceProcessForm form = (GroupResourceProcessForm) context.getProcessForm();
+ GroupOperateType groupOperateType = form.getGroupOperateType();
+ if (groupOperateType == GroupOperateType.SUSPEND || groupOperateType == GroupOperateType.DELETE) {
+ return ListenerResult.success();
+ }
+ InlongGroupInfo groupInfo = form.getGroupInfo();
+ List<InlongStreamInfo> streamInfos = form.getStreamInfos();
+ final String groupId = groupInfo.getInlongGroupId();
+ GroupInfo configInfo = createGroupInfo(groupInfo, streamInfos);
+ String dataFlows = OBJECT_MAPPER.writeValueAsString(configInfo);
+
+ InlongGroupExtInfo extInfo = new InlongGroupExtInfo();
+ extInfo.setInlongGroupId(groupId);
+ extInfo.setKeyName(InlongGroupSettings.DATA_FLOW);
+ extInfo.setKeyValue(dataFlows);
+ if (groupInfo.getExtList() == null) {
+ groupInfo.setExtList(Lists.newArrayList());
+ }
+ upsertDataFlow(groupInfo, extInfo);
+ return ListenerResult.success();
+ }
+
+ private GroupInfo createGroupInfo(InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfoList) {
+ String groupId = groupInfo.getInlongGroupId();
+ List<SinkResponse> sinkResponses = sinkService.listSink(groupId, null);
+ Map<String, List<SinkResponse>> sinkResponseMap = sinkResponses.stream()
+ .collect(Collectors.groupingBy(sinkResponse -> sinkResponse.getInlongStreamId(), HashMap::new,
+ Collectors.toCollection(ArrayList::new)));
+ Map<String, List<SourceResponse>> sourceResponseMap = createPulsarSources(groupInfo, streamInfoList);
+ List<StreamInfo> streamInfos = streamInfoList.stream()
+ .map(inlongStreamInfo -> new StreamInfo(inlongStreamInfo.getInlongStreamId(),
+ createNodesForStream(
+ sourceResponseMap.get(inlongStreamInfo.getInlongStreamId()),
+ sinkResponseMap.get(inlongStreamInfo.getInlongStreamId())),
+ createNodeRelationShipsForStream(
+ sourceResponseMap.get(inlongStreamInfo.getInlongStreamId()),
+ sinkResponseMap.get(inlongStreamInfo.getInlongStreamId())))
+ ).collect(Collectors.toList());
+ return new GroupInfo(groupInfo.getInlongGroupId(), streamInfos);
+ }
+
+ private Map<String, List<SourceResponse>> createPulsarSources(InlongGroupInfo groupInfo,
+ List<InlongStreamInfo> streamInfoList) {
+ MQType mqType = MQType.forType(groupInfo.getMqType());
+ if (mqType != MQType.PULSAR) {
+ String errMsg = String.format("Unsupported MqType={} for Inlong", mqType);
+ log.error(errMsg);
+ throw new WorkflowListenerException(errMsg);
+ }
+ Map<String, List<SourceResponse>> sourceReponses = Maps.newHashMap();
+ PulsarClusterInfo pulsarCluster = commonOperateService.getPulsarClusterInfo(groupInfo.getMqType());
+ streamInfoList.stream().forEach(streamInfo -> {
+ PulsarSourceResponse pulsarSourceResponse = new PulsarSourceResponse();
+ pulsarSourceResponse.setSourceName(streamInfo.getInlongStreamId());
+ pulsarSourceResponse.setNamespace(groupInfo.getMqResource());
+ pulsarSourceResponse.setTopic(streamInfo.getMqResource());
+ pulsarSourceResponse.setAdminUrl(pulsarCluster.getAdminUrl());
+ pulsarSourceResponse.setServiceUrl(pulsarCluster.getBrokerServiceUrl());
+ List<SourceResponse> sourceResponses = sourceService.listSource(groupInfo.getInlongGroupId(),
+ streamInfo.getInlongStreamId());
+ for (SourceResponse sourceResponse : sourceResponses) {
+ pulsarSourceResponse.setSerializationType(sourceResponse.getSerializationType());
+ if (SourceType.forType(sourceResponse.getSourceType()) == SourceType.KAFKA) {
+ pulsarSourceResponse.setPrimaryKey(((KafkaSourceResponse) sourceResponse).getPrimaryKey());
+ }
+ }
+ pulsarSourceResponse.setScanStartupMode("earliest");
+ pulsarSourceResponse.setFieldList(streamInfo.getFieldList());
+ sourceReponses.computeIfAbsent(streamInfo.getInlongStreamId(), key -> Lists.newArrayList())
+ .add(pulsarSourceResponse);
+ });
+ return sourceReponses;
+ }
+
+ private List<Node> createNodesForStream(
+ List<SourceResponse> sourceResponses,
+ List<SinkResponse> sinkResponses) {
+ List<Node> nodes = Lists.newArrayList();
+ nodes.addAll(ExtractNodeUtils.createExtractNodes(sourceResponses));
+ nodes.addAll(LoadNodeUtils.createLoadNodes(sinkResponses));
+ return nodes;
+ }
+
+ private List<NodeRelationShip> createNodeRelationShipsForStream(List<SourceResponse> sourceResponses,
+ List<SinkResponse> sinkResponses) {
+ NodeRelationShip relationShip = new NodeRelationShip();
+ List<String> inputs = sourceResponses.stream().map(sourceResponse -> sourceResponse.getSourceName())
+ .collect(Collectors.toList());
+ List<String> outputs = sinkResponses.stream().map(sinkResponse -> sinkResponse.getSinkName())
+ .collect(Collectors.toList());
+ relationShip.setInputs(inputs);
+ relationShip.setOutputs(outputs);
+ return Lists.newArrayList(relationShip);
+ }
+
+ private void upsertDataFlow(InlongGroupInfo groupInfo, InlongGroupExtInfo extInfo) {
+ groupInfo.getExtList().removeIf(ext -> InlongGroupSettings.DATA_FLOW.equals(ext.getKeyName()));
+ groupInfo.getExtList().add(extInfo);
+ }
+
+ @Override
+ public boolean async() {
+ return false;
+ }
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/PushSortConfigListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/PushSortConfigListener.java
index d5533e1b6..626cb4d4a 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/PushSortConfigListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/PushSortConfigListener.java
@@ -43,6 +43,7 @@ import java.util.List;
/**
* Push sort config when enable the ZooKeeper
*/
+@Deprecated
@Component
public class PushSortConfigListener implements SortOperateListener {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/light/LightGroupSortListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/light/LightGroupSortListener.java
index da4e6b50c..73fb97f1a 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/light/LightGroupSortListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/light/LightGroupSortListener.java
@@ -94,8 +94,8 @@ public class LightGroupSortListener implements SortOperateListener {
return ListenerResult.success();
}
- private GroupInfo createGroupInfo(InlongGroupInfo inlongGroupInfo, List<InlongStreamInfo> inlongStreamInfos) {
- final String groupId = inlongGroupInfo.getInlongGroupId();
+ private GroupInfo createGroupInfo(InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfoList) {
+ final String groupId = groupInfo.getInlongGroupId();
List<SourceResponse> sourceResponses = sourceService.listSource(groupId, null);
Map<String, List<SourceResponse>> sourceResponseMap = sourceResponses.stream()
.collect(Collectors.groupingBy(sourceResponse -> sourceResponse.getInlongStreamId(), HashMap::new,
@@ -108,7 +108,7 @@ public class LightGroupSortListener implements SortOperateListener {
Map<String, List<TransformResponse>> transformResponseMap = transformResponses.stream()
.collect(Collectors.groupingBy(transformResponse -> transformResponse.getInlongStreamId(), HashMap::new,
Collectors.toCollection(ArrayList::new)));
- List<StreamInfo> streamInfos = inlongStreamInfos.stream()
+ List<StreamInfo> streamInfos = streamInfoList.stream()
.map(inlongStreamInfo -> new StreamInfo(inlongStreamInfo.getInlongStreamId(),
createNodesForStream(
sourceResponseMap.get(inlongStreamInfo.getInlongStreamId()),
@@ -116,11 +116,12 @@ public class LightGroupSortListener implements SortOperateListener {
sinkResponseMap.get(inlongStreamInfo.getInlongStreamId())),
NodeRelationShipUtils.createNodeRelationShipsForStream(inlongStreamInfo)))
.collect(Collectors.toList());
+ // Rebuild joinerNode relationship
streamInfos.stream().forEach(streamInfo -> {
List<TransformResponse> transformResponseList = transformResponseMap.get(streamInfo.getStreamId());
NodeRelationShipUtils.optimizeNodeRelationShips(streamInfo, transformResponseList);
});
- return new GroupInfo(inlongGroupInfo.getInlongGroupId(), streamInfos);
+ return new GroupInfo(groupInfo.getInlongGroupId(), streamInfos);
}
private List<Node> createNodesForStream(
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/DataFlowUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/DataFlowUtils.java
index 3643e4139..269e804a3 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/DataFlowUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/DataFlowUtils.java
@@ -49,6 +49,7 @@ import java.util.Map;
* Util for build data flow info.
*/
@Service
+@Deprecated
public class DataFlowUtils {
@Autowired
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
index cfe1b2626..62c917500 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
@@ -29,12 +29,14 @@ import org.apache.inlong.manager.common.pojo.source.SourceResponse;
import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceResponse;
import org.apache.inlong.manager.common.pojo.source.kafka.KafkaOffset;
import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceResponse;
+import org.apache.inlong.manager.common.pojo.source.pulsar.PulsarSourceResponse;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamFieldInfo;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.enums.ScanStartupMode;
import org.apache.inlong.sort.protocol.node.ExtractNode;
import org.apache.inlong.sort.protocol.node.extract.KafkaExtractNode;
import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
+import org.apache.inlong.sort.protocol.node.extract.PulsarExtractNode;
import org.apache.inlong.sort.protocol.node.format.AvroFormat;
import org.apache.inlong.sort.protocol.node.format.CanalJsonFormat;
import org.apache.inlong.sort.protocol.node.format.CsvFormat;
@@ -67,6 +69,8 @@ public class ExtractNodeUtils {
return createExtractNode((BinlogSourceResponse) sourceResponse);
case KAFKA:
return createExtractNode((KafkaSourceResponse) sourceResponse);
+ case PULSAR:
+ return createExtractNode((PulsarSourceResponse) sourceResponse);
default:
throw new IllegalArgumentException(
String.format("Unsupported sourceType=%s to create extractNode", sourceType));
@@ -191,4 +195,58 @@ public class ExtractNodeUtils {
primaryKey,
groupId);
}
+
+ /**
+ * Create PulsarExtractNode based PulsarSourceResponse
+ *
+ * @param pulsarSourceResponse pulsar source response
+ * @return pulsar extract node info
+ */
+ public static PulsarExtractNode createExtractNode(PulsarSourceResponse pulsarSourceResponse) {
+ String id = pulsarSourceResponse.getSourceName();
+ String name = pulsarSourceResponse.getSourceName();
+ List<InlongStreamFieldInfo> streamFieldInfos = pulsarSourceResponse.getFieldList();
+ List<FieldInfo> fieldInfos = streamFieldInfos.stream()
+ .map(streamFieldInfo -> FieldInfoUtils.parseStreamFieldInfo(streamFieldInfo, name))
+ .collect(Collectors.toList());
+ String topic = pulsarSourceResponse.getTopic();
+
+ Format format = null;
+ DataTypeEnum dataType = DataTypeEnum.forName(pulsarSourceResponse.getSerializationType());
+ switch (dataType) {
+ case CSV:
+ format = new CsvFormat();
+ break;
+ case AVRO:
+ format = new AvroFormat();
+ break;
+ case JSON:
+ format = new JsonFormat();
+ break;
+ case CANAL:
+ format = new CanalJsonFormat();
+ break;
+ case DEBEZIUM_JSON:
+ format = new DebeziumJsonFormat();
+ break;
+ default:
+ throw new IllegalArgumentException(String.format("Unsupported dataType=%s for kafka source", dataType));
+ }
+ ScanStartupMode startupMode = ScanStartupMode.forName(pulsarSourceResponse.getScanStartupMode());
+ final String primaryKey = pulsarSourceResponse.getPrimaryKey();
+ final String serviceUrl = pulsarSourceResponse.getServiceUrl();
+ final String adminUrl = pulsarSourceResponse.getAdminUrl();
+
+ return new PulsarExtractNode(id,
+ name,
+ fieldInfos,
+ null,
+ Maps.newHashMap(),
+ topic,
+ adminUrl,
+ serviceUrl,
+ format,
+ startupMode.getValue(),
+ primaryKey);
+ }
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/NodeRelationShipUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/NodeRelationShipUtils.java
index 08e9cbef6..4255b983a 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/NodeRelationShipUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/NodeRelationShipUtils.java
@@ -78,6 +78,7 @@ public class NodeRelationShipUtils {
/**
* Optimize relationship of node.
+ * JoinerRelationship must be rebuild.
*/
public static void optimizeNodeRelationShips(StreamInfo streamInfo, List<TransformResponse> transformResponses) {
if (CollectionUtils.isEmpty(transformResponses)) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperation.java
new file mode 100644
index 000000000..c4c6fa4a2
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperation.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.source.pulsar;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.github.pagehelper.Page;
+import com.github.pagehelper.PageInfo;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.enums.SourceType;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
+import org.apache.inlong.manager.common.pojo.source.SourceRequest;
+import org.apache.inlong.manager.common.pojo.source.SourceResponse;
+import org.apache.inlong.manager.common.pojo.source.pulsar.PulsarSourceDTO;
+import org.apache.inlong.manager.common.pojo.source.pulsar.PulsarSourceListResponse;
+import org.apache.inlong.manager.common.pojo.source.pulsar.PulsarSourceRequest;
+import org.apache.inlong.manager.common.pojo.source.pulsar.PulsarSourceResponse;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
+import org.apache.inlong.manager.service.source.AbstractSourceOperation;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.function.Supplier;
+
+/**
+ * Pulsar stream source operation
+ */
+@Service
+public class PulsarSourceOperation extends AbstractSourceOperation {
+
+ @Autowired
+ private ObjectMapper objectMapper;
+
+ @Override
+ protected void setTargetEntity(SourceRequest request, StreamSourceEntity targetEntity) {
+ PulsarSourceRequest sourceRequest = (PulsarSourceRequest) request;
+ CommonBeanUtils.copyProperties(sourceRequest, targetEntity, true);
+ try {
+ PulsarSourceDTO dto = PulsarSourceDTO.getFromRequest(sourceRequest);
+ targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
+ } catch (Exception e) {
+ throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+ }
+ }
+
+ @Override
+ protected String getSourceType() {
+ return SourceType.PULSAR.getType();
+ }
+
+ @Override
+ protected SourceResponse getResponse() {
+ return new PulsarSourceResponse();
+ }
+
+ @Override
+ public Boolean accept(SourceType sourceType) {
+ return SourceType.PULSAR == sourceType;
+ }
+
+ @Override
+ public <T> T getFromEntity(StreamSourceEntity entity, Supplier<T> target) {
+ T result = target.get();
+ if (entity == null) {
+ return result;
+ }
+ String existType = entity.getSourceType();
+ Preconditions.checkTrue(getSourceType().equals(existType),
+ String.format(ErrorCodeEnum.SOURCE_TYPE_NOT_SAME.getMessage(), getSourceType(), existType));
+ PulsarSourceDTO dto = PulsarSourceDTO.getFromJson(entity.getExtParams());
+ CommonBeanUtils.copyProperties(entity, result, true);
+ CommonBeanUtils.copyProperties(dto, result, true);
+ return result;
+ }
+
+ @Override
+ public PageInfo<? extends SourceListResponse> getPageInfo(Page<StreamSourceEntity> entityPage) {
+ if (CollectionUtils.isEmpty(entityPage)) {
+ return new PageInfo<>();
+ }
+ return entityPage.toPageInfo(entity -> this.getFromEntity(entity, PulsarSourceListResponse::new));
+ }
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/listener/GroupTaskListenerFactory.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/listener/GroupTaskListenerFactory.java
index ec3de2f15..0560856a9 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/listener/GroupTaskListenerFactory.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/listener/GroupTaskListenerFactory.java
@@ -27,7 +27,7 @@ import org.apache.inlong.manager.service.mq.CreateTubeTopicTaskListener;
import org.apache.inlong.manager.service.mq.PulsarEventSelector;
import org.apache.inlong.manager.service.mq.TubeEventSelector;
import org.apache.inlong.manager.service.resource.SinkResourceListener;
-import org.apache.inlong.manager.service.sort.CreateSortConfigListener;
+import org.apache.inlong.manager.service.sort.CreateSortConfigListenerV2;
import org.apache.inlong.manager.service.sort.PushSortConfigListener;
import org.apache.inlong.manager.service.sort.ZookeeperDisabledSelector;
import org.apache.inlong.manager.service.sort.ZookeeperEnabledSelector;
@@ -95,7 +95,7 @@ public class GroupTaskListenerFactory implements PluginBinder, ServiceTaskListen
@Autowired
private PushSortConfigListener pushSortConfigListener;
@Autowired
- private CreateSortConfigListener createSortConfigListener;
+ private CreateSortConfigListenerV2 createSortConfigListener;
@Autowired
private LightGroupSortListener lightGroupSortListener;
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/enums/ScanStartupMode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/enums/ScanStartupMode.java
index d3c9600ec..cb9cb943c 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/enums/ScanStartupMode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/enums/ScanStartupMode.java
@@ -18,13 +18,16 @@
package org.apache.inlong.sort.protocol.enums;
+import java.util.Locale;
+
/**
* kafka consumer scan startup mode enum
*/
public enum ScanStartupMode {
EARLIEST_OFFSET("earliest-offset"),
LATEST_OFFSET("latest-offset"),
- ;
+ EXTERNAL_SUBSCRIPTION("external-subscription"),
+ SPECIFIC_OFFSETS("specific-offsets");
ScanStartupMode(String value) {
this.value = value;
@@ -35,4 +38,14 @@ public enum ScanStartupMode {
public String getValue() {
return value;
}
+
+ public static ScanStartupMode forName(String name) {
+ for (ScanStartupMode startupMode : values()) {
+ if (startupMode.getValue().equals(name.toLowerCase(Locale.ROOT))) {
+ return startupMode;
+ }
+ }
+ throw new IllegalArgumentException(String.format("Unsupported ScanStartupMode=%s for Inlong", name));
+ }
+
}