You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/05/21 04:19:39 UTC

[incubator-inlong] branch master updated: [INLONG-4278][Manager] Support Pulsar extract node in Manager (#4287)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 022bc6c6c [INLONG-4278][Manager] Support Pulsar extract node in Manager (#4287)
022bc6c6c is described below

commit 022bc6c6c29436bae674086c94742a6d17984656
Author: kipshi <48...@users.noreply.github.com>
AuthorDate: Sat May 21 12:19:34 2022 +0800

    [INLONG-4278][Manager] Support Pulsar extract node in Manager (#4287)
---
 .../apache/inlong/common/enums/TaskTypeEnum.java   |   4 +-
 .../inlong/manager/common/enums/SourceType.java    |   4 +-
 .../autopush/AutoPushSourceListResponse.java       |   5 +
 .../source/binlog/BinlogSourceListResponse.java    |   5 +
 .../pojo/source/kafka/KafkaSourceListResponse.java |   5 +
 .../pojo/source/kafka/KafkaSourceRequest.java      |   2 +-
 .../common/pojo/source/pulsar/PulsarSourceDTO.java |  89 ++++++++++
 .../PulsarSourceListResponse.java}                 |  44 ++---
 .../PulsarSourceRequest.java}                      |  50 +++---
 .../PulsarSourceResponse.java}                     |  49 +++---
 .../service/sort/CreateSortConfigListener.java     |   1 +
 .../service/sort/CreateSortConfigListenerV2.java   | 187 +++++++++++++++++++++
 .../service/sort/PushSortConfigListener.java       |   1 +
 .../service/sort/light/LightGroupSortListener.java |   9 +-
 .../manager/service/sort/util/DataFlowUtils.java   |   1 +
 .../service/sort/util/ExtractNodeUtils.java        |  58 +++++++
 .../service/sort/util/NodeRelationShipUtils.java   |   1 +
 .../source/pulsar/PulsarSourceOperation.java       | 101 +++++++++++
 .../listener/GroupTaskListenerFactory.java         |   4 +-
 .../sort/protocol/enums/ScanStartupMode.java       |  15 +-
 20 files changed, 557 insertions(+), 78 deletions(-)

diff --git a/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java b/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java
index b393167fc..6d12377d8 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java
@@ -20,7 +20,7 @@ package org.apache.inlong.common.enums;
 import static java.util.Objects.requireNonNull;
 
 public enum TaskTypeEnum {
-    DATABASE_MIGRATION(0),SQL(1), BINLOG(2), FILE(3), KAFKA(4);
+    DATABASE_MIGRATION(0),SQL(1), BINLOG(2), FILE(3), KAFKA(4), PULSAR(5);
 
     private int type;
 
@@ -41,6 +41,8 @@ public enum TaskTypeEnum {
                 return FILE;
             case 4:
                 return KAFKA;
+            case 5:
+                return PULSAR;
             default:
                 throw new RuntimeException("such task type doesn't exist");
         }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java
index 2ea9726fa..524de69e8 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java
@@ -31,13 +31,15 @@ public enum SourceType {
     FILE("FILE", TaskTypeEnum.FILE),
     SQL("SQL", TaskTypeEnum.SQL),
     BINLOG("BINLOG", TaskTypeEnum.BINLOG),
-    KAFKA("KAFKA", TaskTypeEnum.KAFKA);
+    KAFKA("KAFKA", TaskTypeEnum.KAFKA),
+    PULSAR("PULSAR", TaskTypeEnum.PULSAR);
 
     public static final String SOURCE_AUTO_PUSH = "AUTO_PUSH";
     public static final String SOURCE_FILE = "FILE";
     public static final String SOURCE_SQL = "SQL";
     public static final String SOURCE_BINLOG = "BINLOG";
     public static final String SOURCE_KAFKA = "KAFKA";
+    public static final String SOURCE_PULSAR = "PULSAR";
 
     @Getter
     private final String type;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/autopush/AutoPushSourceListResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/autopush/AutoPushSourceListResponse.java
index 68ddeeecd..82d925980 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/autopush/AutoPushSourceListResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/autopush/AutoPushSourceListResponse.java
@@ -21,6 +21,7 @@ import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
+import org.apache.inlong.manager.common.enums.SourceType;
 import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
 
 /**
@@ -33,4 +34,8 @@ public class AutoPushSourceListResponse extends SourceListResponse {
 
     @ApiModelProperty(value = "DataProxy group name, used when the user enables local configuration")
     private String dataProxyGroup;
+
+    public AutoPushSourceListResponse() {
+        this.setSourceType(SourceType.AUTO_PUSH.getType());
+    }
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceListResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceListResponse.java
index 8ae188fc9..702735077 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceListResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceListResponse.java
@@ -21,6 +21,7 @@ import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
+import org.apache.inlong.manager.common.enums.SourceType;
 import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
 
 /**
@@ -81,4 +82,8 @@ public class BinlogSourceListResponse extends SourceListResponse {
 
     @ApiModelProperty(value = "Primary key must be shared by all tables", required = false)
     private String primaryKey;
+
+    public BinlogSourceListResponse() {
+        this.setSourceType(SourceType.BINLOG.getType());
+    }
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceListResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceListResponse.java
index d1a7b445e..d56c3a0a2 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceListResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceListResponse.java
@@ -21,6 +21,7 @@ import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
+import org.apache.inlong.manager.common.enums.SourceType;
 import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
 
 /**
@@ -57,4 +58,8 @@ public class KafkaSourceListResponse extends SourceListResponse {
     @ApiModelProperty("Primary key, needed when serialization type is csv, json, avro")
     private String primaryKey;
 
+    public KafkaSourceListResponse() {
+        this.setSourceType(SourceType.KAFKA.getType());
+    }
+
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceRequest.java
index 04ae6ac07..afac4eceb 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceRequest.java
@@ -27,7 +27,7 @@ import org.apache.inlong.manager.common.pojo.source.SourceRequest;
 import org.apache.inlong.manager.common.util.JsonTypeDefine;
 
 /**
- * Request of the kafka source info
+ * Request of kafka source info
  */
 @Data
 @ToString(callSuper = true)
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/pulsar/PulsarSourceDTO.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/pulsar/PulsarSourceDTO.java
new file mode 100644
index 000000000..28f6bf6f8
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/pulsar/PulsarSourceDTO.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.source.pulsar;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+
+import javax.validation.constraints.NotNull;
+
+/**
+ * Pulsar source information data transfer object
+ */
+@Data
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
+public class PulsarSourceDTO {
+
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+    @ApiModelProperty("Pulsar tenant")
+    private String tenant;
+
+    @ApiModelProperty("Pulsar namespace")
+    private String namespace;
+
+    @ApiModelProperty("Pulsar topic")
+    private String topic;
+
+    @ApiModelProperty("Pulsar adminUrl")
+    private String adminUrl;
+
+    @ApiModelProperty("Pulsar serviceUrl")
+    private String serviceUrl;
+
+    @ApiModelProperty("Primary key, needed when serialization type is csv, json, avro")
+    private String primaryKey;
+
+    @ApiModelProperty("Configure the Source's startup mode. "
+            + "Available options are earliest, latest, external-subscription, and specific-offsets.")
+    private String scanStartupMode = "earliest";
+
+    /**
+     * Get the dto instance from the request
+     */
+    public static PulsarSourceDTO getFromRequest(PulsarSourceRequest request) {
+        return PulsarSourceDTO.builder()
+                .adminUrl(request.getAdminUrl())
+                .serviceUrl(request.getServiceUrl())
+                .tenant(request.getTenant())
+                .namespace(request.getNamespace())
+                .topic(request.getTopic())
+                .primaryKey(request.getPrimaryKey())
+                .scanStartupMode(request.getScanStartupMode())
+                .build();
+    }
+
+    public static PulsarSourceDTO getFromJson(@NotNull String extParams) {
+        try {
+            OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+            return OBJECT_MAPPER.readValue(extParams, PulsarSourceDTO.class);
+        } catch (Exception e) {
+            throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+        }
+    }
+
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceListResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/pulsar/PulsarSourceListResponse.java
similarity index 54%
copy from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceListResponse.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/pulsar/PulsarSourceListResponse.java
index d1a7b445e..e9686e1c7 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceListResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/pulsar/PulsarSourceListResponse.java
@@ -15,46 +15,46 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.common.pojo.source.kafka;
+package org.apache.inlong.manager.common.pojo.source.pulsar;
 
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
+import org.apache.inlong.manager.common.enums.SourceType;
 import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
 
 /**
- * Response of kafka source list
+ * Response of pulsar source list
  */
 @Data
 @EqualsAndHashCode(callSuper = true)
-@ApiModel("Response of kafka source paging list")
-public class KafkaSourceListResponse extends SourceListResponse {
+@ApiModel("Response of pulsar source paging list")
+public class PulsarSourceListResponse extends SourceListResponse {
 
-    @ApiModelProperty("Kafka topic")
-    private String topic;
-
-    @ApiModelProperty("Kafka consumer group")
-    private String groupId;
-
-    @ApiModelProperty("Kafka servers address")
-    private String bootstrapServers;
+    @ApiModelProperty("Pulsar tenant")
+    private String tenant = "default";
 
-    @ApiModelProperty("Limit the amount of data read per second")
-    private String recordSpeedLimit;
+    @ApiModelProperty("Pulsar namespace")
+    private String namespace;
 
-    @ApiModelProperty("Limit the number of bytes read per second")
-    private String byteSpeedLimit;
+    @ApiModelProperty("Pulsar topic")
+    private String topic;
 
-    @ApiModelProperty(value = "Topic partition offset",
-            notes = "For example, '0#100_1#10' means the offset of partition 0 is 100, the offset of partition 1 is 10")
-    private String topicPartitionOffset;
+    @ApiModelProperty("Pulsar adminUrl")
+    private String adminUrl;
 
-    @ApiModelProperty(value = "The strategy of auto offset reset",
-            notes = "including earliest, latest (the default), none")
-    private String autoOffsetReset;
+    @ApiModelProperty("Pulsar serviceUrl")
+    private String serviceUrl;
 
     @ApiModelProperty("Primary key, needed when serialization type is csv, json, avro")
     private String primaryKey;
 
+    @ApiModelProperty("Configure the Source's startup mode."
+            + " Available options are earliest, latest, external-subscription, and specific-offsets.")
+    private String scanStartupMode = "earliest";
+
+    public PulsarSourceListResponse() {
+        this.setSourceType(SourceType.PULSAR.getType());
+    }
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceListResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/pulsar/PulsarSourceRequest.java
similarity index 50%
copy from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceListResponse.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/pulsar/PulsarSourceRequest.java
index d1a7b445e..6c58496fb 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceListResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/pulsar/PulsarSourceRequest.java
@@ -15,46 +15,50 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.common.pojo.source.kafka;
+package org.apache.inlong.manager.common.pojo.source.pulsar;
 
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
-import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
+import lombok.ToString;
+import org.apache.inlong.manager.common.enums.SourceType;
+import org.apache.inlong.manager.common.pojo.source.SourceRequest;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
 
 /**
- * Response of kafka source list
+ * Request of pulsar source info
  */
 @Data
+@ToString(callSuper = true)
 @EqualsAndHashCode(callSuper = true)
-@ApiModel("Response of kafka source paging list")
-public class KafkaSourceListResponse extends SourceListResponse {
+@ApiModel(value = "Request of the kafka source info")
+@JsonTypeDefine(value = SourceType.SOURCE_PULSAR)
+public class PulsarSourceRequest extends SourceRequest {
 
-    @ApiModelProperty("Kafka topic")
-    private String topic;
-
-    @ApiModelProperty("Kafka consumer group")
-    private String groupId;
-
-    @ApiModelProperty("Kafka servers address")
-    private String bootstrapServers;
+    @ApiModelProperty("Pulsar tenant")
+    private String tenant = "default";
 
-    @ApiModelProperty("Limit the amount of data read per second")
-    private String recordSpeedLimit;
+    @ApiModelProperty("Pulsar namespace")
+    private String namespace;
 
-    @ApiModelProperty("Limit the number of bytes read per second")
-    private String byteSpeedLimit;
+    @ApiModelProperty("Pulsar topic")
+    private String topic;
 
-    @ApiModelProperty(value = "Topic partition offset",
-            notes = "For example, '0#100_1#10' means the offset of partition 0 is 100, the offset of partition 1 is 10")
-    private String topicPartitionOffset;
+    @ApiModelProperty("Pulsar adminUrl")
+    private String adminUrl;
 
-    @ApiModelProperty(value = "The strategy of auto offset reset",
-            notes = "including earliest, latest (the default), none")
-    private String autoOffsetReset;
+    @ApiModelProperty("Pulsar serviceUrl")
+    private String serviceUrl;
 
     @ApiModelProperty("Primary key, needed when serialization type is csv, json, avro")
     private String primaryKey;
 
+    @ApiModelProperty("Configure the Source's startup mode."
+            + " Available options are earliest, latest, external-subscription, and specific-offsets.")
+    private String scanStartupMode = "earliest";
+
+    public PulsarSourceRequest() {
+        this.setSourceType(SourceType.PULSAR.toString());
+    }
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceListResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/pulsar/PulsarSourceResponse.java
similarity index 51%
copy from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceListResponse.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/pulsar/PulsarSourceResponse.java
index d1a7b445e..d98d4dd59 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceListResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/pulsar/PulsarSourceResponse.java
@@ -15,46 +15,49 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.common.pojo.source.kafka;
+package org.apache.inlong.manager.common.pojo.source.pulsar;
 
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
-import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
+import lombok.ToString;
+import org.apache.inlong.manager.common.enums.SourceType;
+import org.apache.inlong.manager.common.pojo.source.SourceResponse;
 
 /**
- * Response of kafka source list
+ * Response of pulsar source
  */
 @Data
+@ToString(callSuper = true)
 @EqualsAndHashCode(callSuper = true)
-@ApiModel("Response of kafka source paging list")
-public class KafkaSourceListResponse extends SourceListResponse {
+@ApiModel(value = "Response of pulsar source")
+public class PulsarSourceResponse extends SourceResponse {
 
-    @ApiModelProperty("Kafka topic")
-    private String topic;
-
-    @ApiModelProperty("Kafka consumer group")
-    private String groupId;
-
-    @ApiModelProperty("Kafka servers address")
-    private String bootstrapServers;
+    @ApiModelProperty("Pulsar tenant")
+    private String tenant = "default";
 
-    @ApiModelProperty("Limit the amount of data read per second")
-    private String recordSpeedLimit;
+    @ApiModelProperty("Pulsar namespace")
+    private String namespace;
 
-    @ApiModelProperty("Limit the number of bytes read per second")
-    private String byteSpeedLimit;
+    @ApiModelProperty("Pulsar topic")
+    private String topic;
 
-    @ApiModelProperty(value = "Topic partition offset",
-            notes = "For example, '0#100_1#10' means the offset of partition 0 is 100, the offset of partition 1 is 10")
-    private String topicPartitionOffset;
+    @ApiModelProperty("Pulsar adminUrl")
+    private String adminUrl;
 
-    @ApiModelProperty(value = "The strategy of auto offset reset",
-            notes = "including earliest, latest (the default), none")
-    private String autoOffsetReset;
+    @ApiModelProperty("Pulsar serviceUrl")
+    private String serviceUrl;
 
     @ApiModelProperty("Primary key, needed when serialization type is csv, json, avro")
     private String primaryKey;
 
+    @ApiModelProperty("Configure the Source's startup mode. "
+            + "Available options are earliest, latest, external-subscription, and specific-offsets.")
+    private String scanStartupMode = "earliest";
+
+    public PulsarSourceResponse() {
+        this.setSourceType(SourceType.PULSAR.getType());
+    }
+
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListener.java
index 510f12662..59b117517 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListener.java
@@ -48,6 +48,7 @@ import java.util.stream.Collectors;
 /**
  * Create sort config when disable the ZooKeeper
  */
+@Deprecated
 @Component
 public class CreateSortConfigListener implements SortOperateListener {
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListenerV2.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListenerV2.java
new file mode 100644
index 000000000..842ea887d
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListenerV2.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.sort;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.inlong.common.pojo.dataproxy.PulsarClusterInfo;
+import org.apache.inlong.manager.common.enums.GroupOperateType;
+import org.apache.inlong.manager.common.enums.MQType;
+import org.apache.inlong.manager.common.enums.SourceType;
+import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
+import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
+import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
+import org.apache.inlong.manager.common.pojo.source.SourceResponse;
+import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceResponse;
+import org.apache.inlong.manager.common.pojo.source.pulsar.PulsarSourceResponse;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
+import org.apache.inlong.manager.common.settings.InlongGroupSettings;
+import org.apache.inlong.manager.service.CommonOperateService;
+import org.apache.inlong.manager.service.sink.StreamSinkService;
+import org.apache.inlong.manager.service.sort.util.ExtractNodeUtils;
+import org.apache.inlong.manager.service.sort.util.LoadNodeUtils;
+import org.apache.inlong.manager.service.source.StreamSourceService;
+import org.apache.inlong.manager.workflow.WorkflowContext;
+import org.apache.inlong.manager.workflow.event.ListenerResult;
+import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
+import org.apache.inlong.manager.workflow.event.task.TaskEvent;
+import org.apache.inlong.sort.protocol.GroupInfo;
+import org.apache.inlong.sort.protocol.StreamInfo;
+import org.apache.inlong.sort.protocol.node.Node;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelationShip;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+@Component
+@Slf4j
+public class CreateSortConfigListenerV2 implements SortOperateListener {
+
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+    @Autowired
+    private StreamSourceService sourceService;
+
+    @Autowired
+    private StreamSinkService sinkService;
+
+    @Autowired
+    private CommonOperateService commonOperateService;
+
+    @Override
+    public TaskEvent event() {
+        return TaskEvent.COMPLETE;
+    }
+
+    @Override
+    public ListenerResult listen(WorkflowContext context) throws Exception {
+        log.info("Create sort config V2 for groupId={}", context.getProcessForm().getInlongGroupId());
+        GroupResourceProcessForm form = (GroupResourceProcessForm) context.getProcessForm();
+        GroupOperateType groupOperateType = form.getGroupOperateType();
+        if (groupOperateType == GroupOperateType.SUSPEND || groupOperateType == GroupOperateType.DELETE) {
+            return ListenerResult.success();
+        }
+        InlongGroupInfo groupInfo = form.getGroupInfo();
+        List<InlongStreamInfo> streamInfos = form.getStreamInfos();
+        final String groupId = groupInfo.getInlongGroupId();
+        GroupInfo configInfo = createGroupInfo(groupInfo, streamInfos);
+        String dataFlows = OBJECT_MAPPER.writeValueAsString(configInfo);
+
+        InlongGroupExtInfo extInfo = new InlongGroupExtInfo();
+        extInfo.setInlongGroupId(groupId);
+        extInfo.setKeyName(InlongGroupSettings.DATA_FLOW);
+        extInfo.setKeyValue(dataFlows);
+        if (groupInfo.getExtList() == null) {
+            groupInfo.setExtList(Lists.newArrayList());
+        }
+        upsertDataFlow(groupInfo, extInfo);
+        return ListenerResult.success();
+    }
+
+    private GroupInfo createGroupInfo(InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfoList) {
+        String groupId = groupInfo.getInlongGroupId();
+        List<SinkResponse> sinkResponses = sinkService.listSink(groupId, null);
+        Map<String, List<SinkResponse>> sinkResponseMap = sinkResponses.stream()
+                .collect(Collectors.groupingBy(sinkResponse -> sinkResponse.getInlongStreamId(), HashMap::new,
+                        Collectors.toCollection(ArrayList::new)));
+        Map<String, List<SourceResponse>> sourceResponseMap = createPulsarSources(groupInfo, streamInfoList);
+        List<StreamInfo> streamInfos = streamInfoList.stream()
+                .map(inlongStreamInfo -> new StreamInfo(inlongStreamInfo.getInlongStreamId(),
+                        createNodesForStream(
+                                sourceResponseMap.get(inlongStreamInfo.getInlongStreamId()),
+                                sinkResponseMap.get(inlongStreamInfo.getInlongStreamId())),
+                        createNodeRelationShipsForStream(
+                                sourceResponseMap.get(inlongStreamInfo.getInlongStreamId()),
+                                sinkResponseMap.get(inlongStreamInfo.getInlongStreamId())))
+                ).collect(Collectors.toList());
+        return new GroupInfo(groupInfo.getInlongGroupId(), streamInfos);
+    }
+
+    private Map<String, List<SourceResponse>> createPulsarSources(InlongGroupInfo groupInfo,
+            List<InlongStreamInfo> streamInfoList) {
+        MQType mqType = MQType.forType(groupInfo.getMqType());
+        if (mqType != MQType.PULSAR) {
+            String errMsg = String.format("Unsupported MqType={} for Inlong", mqType);
+            log.error(errMsg);
+            throw new WorkflowListenerException(errMsg);
+        }
+        Map<String, List<SourceResponse>> sourceReponses = Maps.newHashMap();
+        PulsarClusterInfo pulsarCluster = commonOperateService.getPulsarClusterInfo(groupInfo.getMqType());
+        streamInfoList.stream().forEach(streamInfo -> {
+            PulsarSourceResponse pulsarSourceResponse = new PulsarSourceResponse();
+            pulsarSourceResponse.setSourceName(streamInfo.getInlongStreamId());
+            pulsarSourceResponse.setNamespace(groupInfo.getMqResource());
+            pulsarSourceResponse.setTopic(streamInfo.getMqResource());
+            pulsarSourceResponse.setAdminUrl(pulsarCluster.getAdminUrl());
+            pulsarSourceResponse.setServiceUrl(pulsarCluster.getBrokerServiceUrl());
+            List<SourceResponse> sourceResponses = sourceService.listSource(groupInfo.getInlongGroupId(),
+                    streamInfo.getInlongStreamId());
+            for (SourceResponse sourceResponse : sourceResponses) {
+                pulsarSourceResponse.setSerializationType(sourceResponse.getSerializationType());
+                if (SourceType.forType(sourceResponse.getSourceType()) == SourceType.KAFKA) {
+                    pulsarSourceResponse.setPrimaryKey(((KafkaSourceResponse) sourceResponse).getPrimaryKey());
+                }
+            }
+            pulsarSourceResponse.setScanStartupMode("earliest");
+            pulsarSourceResponse.setFieldList(streamInfo.getFieldList());
+            sourceReponses.computeIfAbsent(streamInfo.getInlongStreamId(), key -> Lists.newArrayList())
+                    .add(pulsarSourceResponse);
+        });
+        return sourceReponses;
+    }
+
+    private List<Node> createNodesForStream(
+            List<SourceResponse> sourceResponses,
+            List<SinkResponse> sinkResponses) {
+        List<Node> nodes = Lists.newArrayList();
+        nodes.addAll(ExtractNodeUtils.createExtractNodes(sourceResponses));
+        nodes.addAll(LoadNodeUtils.createLoadNodes(sinkResponses));
+        return nodes;
+    }
+
+    private List<NodeRelationShip> createNodeRelationShipsForStream(List<SourceResponse> sourceResponses,
+            List<SinkResponse> sinkResponses) {
+        NodeRelationShip relationShip = new NodeRelationShip();
+        List<String> inputs = sourceResponses.stream().map(sourceResponse -> sourceResponse.getSourceName())
+                .collect(Collectors.toList());
+        List<String> outputs = sinkResponses.stream().map(sinkResponse -> sinkResponse.getSinkName())
+                .collect(Collectors.toList());
+        relationShip.setInputs(inputs);
+        relationShip.setOutputs(outputs);
+        return Lists.newArrayList(relationShip);
+    }
+
+    private void upsertDataFlow(InlongGroupInfo groupInfo, InlongGroupExtInfo extInfo) {
+        groupInfo.getExtList().removeIf(ext -> InlongGroupSettings.DATA_FLOW.equals(ext.getKeyName()));
+        groupInfo.getExtList().add(extInfo);
+    }
+
+    @Override
+    public boolean async() {
+        return false;
+    }
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/PushSortConfigListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/PushSortConfigListener.java
index d5533e1b6..626cb4d4a 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/PushSortConfigListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/PushSortConfigListener.java
@@ -43,6 +43,7 @@ import java.util.List;
 /**
  * Push sort config when enable the ZooKeeper
  */
+@Deprecated
 @Component
 public class PushSortConfigListener implements SortOperateListener {
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/light/LightGroupSortListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/light/LightGroupSortListener.java
index da4e6b50c..73fb97f1a 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/light/LightGroupSortListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/light/LightGroupSortListener.java
@@ -94,8 +94,8 @@ public class LightGroupSortListener implements SortOperateListener {
         return ListenerResult.success();
     }
 
-    private GroupInfo createGroupInfo(InlongGroupInfo inlongGroupInfo, List<InlongStreamInfo> inlongStreamInfos) {
-        final String groupId = inlongGroupInfo.getInlongGroupId();
+    private GroupInfo createGroupInfo(InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfoList) {
+        final String groupId = groupInfo.getInlongGroupId();
         List<SourceResponse> sourceResponses = sourceService.listSource(groupId, null);
         Map<String, List<SourceResponse>> sourceResponseMap = sourceResponses.stream()
                 .collect(Collectors.groupingBy(sourceResponse -> sourceResponse.getInlongStreamId(), HashMap::new,
@@ -108,7 +108,7 @@ public class LightGroupSortListener implements SortOperateListener {
         Map<String, List<TransformResponse>> transformResponseMap = transformResponses.stream()
                 .collect(Collectors.groupingBy(transformResponse -> transformResponse.getInlongStreamId(), HashMap::new,
                         Collectors.toCollection(ArrayList::new)));
-        List<StreamInfo> streamInfos = inlongStreamInfos.stream()
+        List<StreamInfo> streamInfos = streamInfoList.stream()
                 .map(inlongStreamInfo -> new StreamInfo(inlongStreamInfo.getInlongStreamId(),
                         createNodesForStream(
                                 sourceResponseMap.get(inlongStreamInfo.getInlongStreamId()),
@@ -116,11 +116,12 @@ public class LightGroupSortListener implements SortOperateListener {
                                 sinkResponseMap.get(inlongStreamInfo.getInlongStreamId())),
                         NodeRelationShipUtils.createNodeRelationShipsForStream(inlongStreamInfo)))
                 .collect(Collectors.toList());
+        // Rebuild joinerNode relationship
         streamInfos.stream().forEach(streamInfo -> {
             List<TransformResponse> transformResponseList = transformResponseMap.get(streamInfo.getStreamId());
             NodeRelationShipUtils.optimizeNodeRelationShips(streamInfo, transformResponseList);
         });
-        return new GroupInfo(inlongGroupInfo.getInlongGroupId(), streamInfos);
+        return new GroupInfo(groupInfo.getInlongGroupId(), streamInfos);
     }
 
     private List<Node> createNodesForStream(
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/DataFlowUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/DataFlowUtils.java
index 3643e4139..269e804a3 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/DataFlowUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/DataFlowUtils.java
@@ -49,6 +49,7 @@ import java.util.Map;
  * Util for build data flow info.
  */
 @Service
+@Deprecated
 public class DataFlowUtils {
 
     @Autowired
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
index cfe1b2626..62c917500 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
@@ -29,12 +29,14 @@ import org.apache.inlong.manager.common.pojo.source.SourceResponse;
 import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceResponse;
 import org.apache.inlong.manager.common.pojo.source.kafka.KafkaOffset;
 import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceResponse;
+import org.apache.inlong.manager.common.pojo.source.pulsar.PulsarSourceResponse;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamFieldInfo;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.enums.ScanStartupMode;
 import org.apache.inlong.sort.protocol.node.ExtractNode;
 import org.apache.inlong.sort.protocol.node.extract.KafkaExtractNode;
 import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
+import org.apache.inlong.sort.protocol.node.extract.PulsarExtractNode;
 import org.apache.inlong.sort.protocol.node.format.AvroFormat;
 import org.apache.inlong.sort.protocol.node.format.CanalJsonFormat;
 import org.apache.inlong.sort.protocol.node.format.CsvFormat;
@@ -67,6 +69,8 @@ public class ExtractNodeUtils {
                 return createExtractNode((BinlogSourceResponse) sourceResponse);
             case KAFKA:
                 return createExtractNode((KafkaSourceResponse) sourceResponse);
+            case PULSAR:
+                return createExtractNode((PulsarSourceResponse) sourceResponse);
             default:
                 throw new IllegalArgumentException(
                         String.format("Unsupported sourceType=%s to create extractNode", sourceType));
@@ -191,4 +195,58 @@ public class ExtractNodeUtils {
                 primaryKey,
                 groupId);
     }
+
+    /**
+     * Create PulsarExtractNode based PulsarSourceResponse
+     *
+     * @param pulsarSourceResponse pulsar source response
+     * @return pulsar extract node info
+     */
+    public static PulsarExtractNode createExtractNode(PulsarSourceResponse pulsarSourceResponse) {
+        String id = pulsarSourceResponse.getSourceName();
+        String name = pulsarSourceResponse.getSourceName();
+        List<InlongStreamFieldInfo> streamFieldInfos = pulsarSourceResponse.getFieldList();
+        List<FieldInfo> fieldInfos = streamFieldInfos.stream()
+                .map(streamFieldInfo -> FieldInfoUtils.parseStreamFieldInfo(streamFieldInfo, name))
+                .collect(Collectors.toList());
+        String topic = pulsarSourceResponse.getTopic();
+
+        Format format = null;
+        DataTypeEnum dataType = DataTypeEnum.forName(pulsarSourceResponse.getSerializationType());
+        switch (dataType) {
+            case CSV:
+                format = new CsvFormat();
+                break;
+            case AVRO:
+                format = new AvroFormat();
+                break;
+            case JSON:
+                format = new JsonFormat();
+                break;
+            case CANAL:
+                format = new CanalJsonFormat();
+                break;
+            case DEBEZIUM_JSON:
+                format = new DebeziumJsonFormat();
+                break;
+            default:
+                throw new IllegalArgumentException(String.format("Unsupported dataType=%s for kafka source", dataType));
+        }
+        ScanStartupMode startupMode = ScanStartupMode.forName(pulsarSourceResponse.getScanStartupMode());
+        final String primaryKey = pulsarSourceResponse.getPrimaryKey();
+        final String serviceUrl = pulsarSourceResponse.getServiceUrl();
+        final String adminUrl = pulsarSourceResponse.getAdminUrl();
+
+        return new PulsarExtractNode(id,
+                name,
+                fieldInfos,
+                null,
+                Maps.newHashMap(),
+                topic,
+                adminUrl,
+                serviceUrl,
+                format,
+                startupMode.getValue(),
+                primaryKey);
+    }
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/NodeRelationShipUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/NodeRelationShipUtils.java
index 08e9cbef6..4255b983a 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/NodeRelationShipUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/NodeRelationShipUtils.java
@@ -78,6 +78,7 @@ public class NodeRelationShipUtils {
 
     /**
      * Optimize relationship of node.
+     * JoinerRelationship must be rebuild.
      */
     public static void optimizeNodeRelationShips(StreamInfo streamInfo, List<TransformResponse> transformResponses) {
         if (CollectionUtils.isEmpty(transformResponses)) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperation.java
new file mode 100644
index 000000000..c4c6fa4a2
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperation.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.source.pulsar;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.github.pagehelper.Page;
+import com.github.pagehelper.PageInfo;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.enums.SourceType;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
+import org.apache.inlong.manager.common.pojo.source.SourceRequest;
+import org.apache.inlong.manager.common.pojo.source.SourceResponse;
+import org.apache.inlong.manager.common.pojo.source.pulsar.PulsarSourceDTO;
+import org.apache.inlong.manager.common.pojo.source.pulsar.PulsarSourceListResponse;
+import org.apache.inlong.manager.common.pojo.source.pulsar.PulsarSourceRequest;
+import org.apache.inlong.manager.common.pojo.source.pulsar.PulsarSourceResponse;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
+import org.apache.inlong.manager.service.source.AbstractSourceOperation;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.function.Supplier;
+
+/**
+ * Pulsar stream source operation
+ */
+@Service
+public class PulsarSourceOperation extends AbstractSourceOperation {
+
+    @Autowired
+    private ObjectMapper objectMapper;
+
+    @Override
+    protected void setTargetEntity(SourceRequest request, StreamSourceEntity targetEntity) {
+        PulsarSourceRequest sourceRequest = (PulsarSourceRequest) request;
+        CommonBeanUtils.copyProperties(sourceRequest, targetEntity, true);
+        try {
+            PulsarSourceDTO dto = PulsarSourceDTO.getFromRequest(sourceRequest);
+            targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
+        } catch (Exception e) {
+            throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+        }
+    }
+
+    @Override
+    protected String getSourceType() {
+        return SourceType.PULSAR.getType();
+    }
+
+    @Override
+    protected SourceResponse getResponse() {
+        return new PulsarSourceResponse();
+    }
+
+    @Override
+    public Boolean accept(SourceType sourceType) {
+        return SourceType.PULSAR == sourceType;
+    }
+
+    @Override
+    public <T> T getFromEntity(StreamSourceEntity entity, Supplier<T> target) {
+        T result = target.get();
+        if (entity == null) {
+            return result;
+        }
+        String existType = entity.getSourceType();
+        Preconditions.checkTrue(getSourceType().equals(existType),
+                String.format(ErrorCodeEnum.SOURCE_TYPE_NOT_SAME.getMessage(), getSourceType(), existType));
+        PulsarSourceDTO dto = PulsarSourceDTO.getFromJson(entity.getExtParams());
+        CommonBeanUtils.copyProperties(entity, result, true);
+        CommonBeanUtils.copyProperties(dto, result, true);
+        return result;
+    }
+
+    @Override
+    public PageInfo<? extends SourceListResponse> getPageInfo(Page<StreamSourceEntity> entityPage) {
+        if (CollectionUtils.isEmpty(entityPage)) {
+            return new PageInfo<>();
+        }
+        return entityPage.toPageInfo(entity -> this.getFromEntity(entity, PulsarSourceListResponse::new));
+    }
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/listener/GroupTaskListenerFactory.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/listener/GroupTaskListenerFactory.java
index ec3de2f15..0560856a9 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/listener/GroupTaskListenerFactory.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/listener/GroupTaskListenerFactory.java
@@ -27,7 +27,7 @@ import org.apache.inlong.manager.service.mq.CreateTubeTopicTaskListener;
 import org.apache.inlong.manager.service.mq.PulsarEventSelector;
 import org.apache.inlong.manager.service.mq.TubeEventSelector;
 import org.apache.inlong.manager.service.resource.SinkResourceListener;
-import org.apache.inlong.manager.service.sort.CreateSortConfigListener;
+import org.apache.inlong.manager.service.sort.CreateSortConfigListenerV2;
 import org.apache.inlong.manager.service.sort.PushSortConfigListener;
 import org.apache.inlong.manager.service.sort.ZookeeperDisabledSelector;
 import org.apache.inlong.manager.service.sort.ZookeeperEnabledSelector;
@@ -95,7 +95,7 @@ public class GroupTaskListenerFactory implements PluginBinder, ServiceTaskListen
     @Autowired
     private PushSortConfigListener pushSortConfigListener;
     @Autowired
-    private CreateSortConfigListener createSortConfigListener;
+    private CreateSortConfigListenerV2 createSortConfigListener;
 
     @Autowired
     private LightGroupSortListener lightGroupSortListener;
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/enums/ScanStartupMode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/enums/ScanStartupMode.java
index d3c9600ec..cb9cb943c 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/enums/ScanStartupMode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/enums/ScanStartupMode.java
@@ -18,13 +18,16 @@
 
 package org.apache.inlong.sort.protocol.enums;
 
+import java.util.Locale;
+
 /**
  * kafka consumer scan startup mode enum
  */
 public enum ScanStartupMode {
     EARLIEST_OFFSET("earliest-offset"),
     LATEST_OFFSET("latest-offset"),
-    ;
+    EXTERNAL_SUBSCRIPTION("external-subscription"),
+    SPECIFIC_OFFSETS("specific-offsets");
 
     ScanStartupMode(String value) {
         this.value = value;
@@ -35,4 +38,14 @@ public enum ScanStartupMode {
     public String getValue() {
         return value;
     }
+
+    public static ScanStartupMode forName(String name) {
+        for (ScanStartupMode startupMode : values()) {
+            if (startupMode.getValue().equals(name.toLowerCase(Locale.ROOT))) {
+                return startupMode;
+            }
+        }
+        throw new IllegalArgumentException(String.format("Unsupported ScanStartupMode=%s for Inlong", name));
+    }
+
 }