You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/11/10 13:45:33 UTC

[inlong] branch branch-1.4 updated (4634918b5 -> d061d9a4e)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a change to branch branch-1.4
in repository https://gitbox.apache.org/repos/asf/inlong.git


    from 4634918b5 [INLONG-6484][Sort] Bugfix: fix schema update Circular dependency error in multiple sink iceberg scenes (#6486)
     new b14dbfa7b [INLONG-6044][Manager] Distinguish config processes between the InlongGroup and InlongStream (#6046)
     new 80ae93733 [INLONG-6493][Dashboard] After the Stream is configured successfully, some parameters cannot be modified (#6494)
     new 4e511b880 [INLONG-6502][Manager] Optimize some log and code formats (#6503)
     new d061d9a4e [INLONG-6487][Manager] Add API to force delete the stream source (#6489)

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../src/metas/streams/common/StreamDefaultInfo.ts  |  35 +++--
 .../GroupDetail/DataStream/StreamItemModal.tsx     |  14 +-
 .../client/api/{NoAuth.java => StreamSource.java}  |  16 +--
 .../{NoAuthImpl.java => StreamSourceImpl.java}     |  28 ++--
 .../api/inner/client/StreamSourceClient.java       |  17 +++
 .../client/api/service/StreamSourceApi.java        |   4 +
 .../manager/common/consts/InlongConstants.java     |   8 ++
 .../inlong/manager/common/enums/ClusterStatus.java |  15 +--
 .../inlong/manager/common/enums/GroupStatus.java   |   8 +-
 .../manager/dao/entity/WorkflowProcessEntity.java  |   1 +
 .../dao/mapper/StreamSourceEntityMapper.java       |   3 +
 .../dao/mapper/StreamSourceFieldEntityMapper.java  |  11 +-
 .../resources/mappers/StreamSourceEntityMapper.xml |  17 ++-
 .../mappers/StreamSourceFieldEntityMapper.xml      |  15 ++-
 .../mappers/WorkflowProcessEntityMapper.xml        |  27 ++--
 .../inlong/manager/dao/RestTemplateConfig.java     |   5 +-
 .../manager/plugin/flink/TaskRunService.java       |  50 +++----
 .../plugin/listener/DeleteSortListener.java        |  24 ++--
 .../plugin/listener/DeleteStreamListener.java      |  27 ++--
 .../inlong/manager/plugin/util/FlinkUtils.java     |   2 +-
 .../manager/pojo/sink/mysql/MySQLSinkDTO.java      |  13 +-
 .../manager/pojo/workflow/ProcessRequest.java      |   3 +
 .../manager/pojo/workflow/TaskLogRequest.java      |   3 +
 .../form/process/StreamResourceProcessForm.java    |  13 ++
 .../manager/service/core/SortConfigLoader.java     |  32 +++--
 .../manager/service/core/impl/AbstractService.java | 119 -----------------
 .../service/group/InlongGroupProcessService.java   |  23 ++--
 .../service/heartbeat/HeartbeatManager.java        |   2 +-
 .../listener/group/InitGroupCompleteListener.java  |  24 ++--
 .../service/listener/group/InitGroupListener.java  |   4 -
 .../listener/queue/QueueResourceListener.java      |  75 +++++++++++
 .../queue/StreamQueueResourceListener.java         |   3 -
 .../service/listener/sort/SortConfigListener.java  |   9 +-
 .../stream/InitStreamCompleteListener.java         |   7 +-
 .../service/operationlog/OperationLogPool.java     |  29 +++--
 .../queue/kafka/KafkaResourceOperators.java        |  44 ++-----
 .../resource/queue/pulsar/PulsarOperator.java      |   2 +-
 .../queue/pulsar/PulsarResourceOperator.java       | 143 +++++++++------------
 .../resource/sink/es/ElasticsearchConfig.java      |  16 +--
 .../resource/sink/mysql/MySQLJdbcUtils.java        |  88 +++++++------
 .../resource/sink/mysql/MySQLResourceOperator.java |  18 +--
 .../resource/sort/DefaultSortConfigOperator.java   |  21 ++-
 .../service/source/AbstractSourceOperator.java     |   4 +-
 .../service/source/SourceSnapshotOperator.java     |  21 +--
 .../service/source/StreamSourceService.java        |  10 ++
 .../service/source/StreamSourceServiceImpl.java    |  14 ++
 .../source/pulsar/PulsarSourceOperator.java        |   1 +
 .../service/stream/InlongStreamProcessService.java |  46 +++----
 .../service/stream/InlongStreamServiceImpl.java    |   5 +-
 .../service/workflow/WorkflowServiceImpl.java      |   1 +
 .../group/CreateGroupWorkflowDefinition.java       |  24 +---
 .../group/DeleteGroupWorkflowDefinition.java       |   8 +-
 .../stream/CreateStreamWorkflowDefinition.java     |   4 +-
 .../stream/DeleteStreamWorkflowDefinition.java     |   4 +-
 .../inlong/manager/service/RestTemplateConfig.java |   5 +-
 .../group/CreateGroupWorkflowDefinitionTest.java   |   4 +-
 .../resources/application-unit-test.properties     |   6 +-
 .../main/resources/h2/apache_inlong_manager.sql    |   3 +-
 .../manager-web/sql/apache_inlong_manager.sql      |   3 +-
 .../manager/web/auth/openapi/OpenAPIFilter.java    |   2 +-
 .../manager/web/auth/web/AuthenticationFilter.java |   2 +-
 .../manager/web/config/RestTemplateConfig.java     |  10 +-
 .../web/controller/StreamSourceController.java     |  15 ++-
 .../src/main/resources/application-prod.properties |   3 +-
 .../src/main/resources/application-test.properties |   5 +-
 .../event/process/ProcessEventListener.java        |  13 +-
 .../workflow/processor/ServiceTaskProcessor.java   |  18 ++-
 .../workflow/processor/StartEventProcessor.java    |  29 +++--
 68 files changed, 649 insertions(+), 629 deletions(-)
 copy inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/{NoAuth.java => StreamSource.java} (75%)
 copy inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/{NoAuthImpl.java => StreamSourceImpl.java} (58%)
 delete mode 100644 inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AbstractService.java


[inlong] 03/04: [INLONG-6502][Manager] Optimize some log and code formats (#6503)

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.4
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 4e511b880eb57c30a0908fb7abd9e5276a9c2219
Author: healchow <he...@gmail.com>
AuthorDate: Thu Nov 10 19:03:00 2022 +0800

    [INLONG-6502][Manager] Optimize some log and code formats (#6503)
---
 .../manager/common/consts/InlongConstants.java     |   8 ++
 .../inlong/manager/common/enums/ClusterStatus.java |  15 +--
 .../inlong/manager/common/enums/GroupStatus.java   |   8 +-
 .../inlong/manager/dao/RestTemplateConfig.java     |   5 +-
 .../manager/plugin/flink/TaskRunService.java       |  50 ++++----
 .../plugin/listener/DeleteSortListener.java        |  24 ++--
 .../plugin/listener/DeleteStreamListener.java      |  27 ++---
 .../inlong/manager/plugin/util/FlinkUtils.java     |   2 +-
 .../manager/pojo/sink/mysql/MySQLSinkDTO.java      |  13 +-
 .../manager/service/core/SortConfigLoader.java     |  32 +++--
 .../manager/service/core/impl/AbstractService.java | 119 -------------------
 .../service/group/InlongGroupProcessService.java   |  23 ++--
 .../service/heartbeat/HeartbeatManager.java        |   2 +-
 .../listener/queue/QueueResourceListener.java      |  26 ++--
 .../service/listener/sort/SortConfigListener.java  |   6 +-
 .../service/operationlog/OperationLogPool.java     |  29 ++---
 .../queue/kafka/KafkaResourceOperators.java        |  23 ++--
 .../resource/queue/pulsar/PulsarOperator.java      |   2 +-
 .../queue/pulsar/PulsarResourceOperator.java       | 131 ++++++++++-----------
 .../resource/sink/es/ElasticsearchConfig.java      |  16 ++-
 .../resource/sink/mysql/MySQLJdbcUtils.java        |  88 +++++++-------
 .../resource/sink/mysql/MySQLResourceOperator.java |  18 +--
 .../resource/sort/DefaultSortConfigOperator.java   |  13 +-
 .../service/source/AbstractSourceOperator.java     |   4 +-
 .../service/source/SourceSnapshotOperator.java     |  21 ++--
 .../source/pulsar/PulsarSourceOperator.java        |   1 +
 .../service/stream/InlongStreamProcessService.java |  26 ++--
 .../service/stream/InlongStreamServiceImpl.java    |   5 +-
 .../inlong/manager/service/RestTemplateConfig.java |   5 +-
 .../resources/application-unit-test.properties     |   6 +-
 .../manager/web/auth/openapi/OpenAPIFilter.java    |   2 +-
 .../manager/web/auth/web/AuthenticationFilter.java |   2 +-
 .../manager/web/config/RestTemplateConfig.java     |  10 +-
 .../src/main/resources/application-prod.properties |   3 +-
 .../src/main/resources/application-test.properties |   5 +-
 .../event/process/ProcessEventListener.java        |  13 +-
 .../workflow/processor/ServiceTaskProcessor.java   |  18 +--
 .../workflow/processor/StartEventProcessor.java    |  24 ++--
 38 files changed, 365 insertions(+), 460 deletions(-)

diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
index 28c2435d2..5d0b0a13d 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
@@ -22,6 +22,14 @@ package org.apache.inlong.manager.common.consts;
  */
 public class InlongConstants {
 
+    /**
+     * Thread pool related config.
+     */
+    public static final int CORE_POOL_SIZE = 10;
+    public static final int MAX_POOL_SIZE = 20;
+    public static final long ALIVE_TIME_MS = 100L;
+    public static final int QUEUE_SIZE = 10000;
+
     /**
      * Group config
      */
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ClusterStatus.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ClusterStatus.java
index 0cccd1589..e6f3b0d3a 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ClusterStatus.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ClusterStatus.java
@@ -17,8 +17,6 @@
 
 package org.apache.inlong.manager.common.enums;
 
-import org.apache.inlong.manager.common.exceptions.WorkflowException;
-
 /**
  * Status enum of cluster
  */
@@ -28,7 +26,7 @@ public enum ClusterStatus {
 
     INITING(2);
 
-    int status;
+    final int status;
 
     ClusterStatus(int status) {
         this.status = status;
@@ -37,15 +35,4 @@ public enum ClusterStatus {
     public int getStatus() {
         return status;
     }
-
-    public static ClusterStatus fromStatus(int status) {
-        switch (status) {
-            case 1:
-                return NORMAL;
-            case 2:
-                return INITING;
-            default:
-                throw new WorkflowException("unknown status: " + status);
-        }
-    }
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupStatus.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupStatus.java
index dafcfe747..e60b898d5 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupStatus.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupStatus.java
@@ -49,7 +49,7 @@ public enum GroupStatus {
     DELETING(41, "deleting"),
     DELETED(40, "deleted"),
 
-    // GROUP_FINISH is used for batch task.
+    // FINISH is used for batch task.
     FINISH(131, "finish");
 
     private static final Map<GroupStatus, Set<GroupStatus>> GROUP_STATE_AUTOMATON = Maps.newHashMap();
@@ -60,8 +60,7 @@ public enum GroupStatus {
     static {
         GROUP_STATE_AUTOMATON.put(DRAFT, Sets.newHashSet(DRAFT, TO_BE_SUBMIT, DELETING));
         GROUP_STATE_AUTOMATON.put(TO_BE_SUBMIT, Sets.newHashSet(TO_BE_SUBMIT, TO_BE_APPROVAL, DELETING));
-        GROUP_STATE_AUTOMATON.put(TO_BE_APPROVAL,
-                Sets.newHashSet(TO_BE_APPROVAL, APPROVE_REJECTED, APPROVE_PASSED, DELETING));
+        GROUP_STATE_AUTOMATON.put(TO_BE_APPROVAL, Sets.newHashSet(TO_BE_APPROVAL, APPROVE_REJECTED, APPROVE_PASSED));
 
         GROUP_STATE_AUTOMATON.put(APPROVE_REJECTED, Sets.newHashSet(APPROVE_REJECTED, TO_BE_APPROVAL, DELETING));
         GROUP_STATE_AUTOMATON.put(APPROVE_PASSED, Sets.newHashSet(APPROVE_PASSED, CONFIG_ING, DELETING));
@@ -149,6 +148,7 @@ public enum GroupStatus {
 
     @Override
     public String toString() {
-        return name().toLowerCase(Locale.ROOT).replace("group_", "");
+        return name().toLowerCase(Locale.ROOT);
     }
+
 }
diff --git a/inlong-manager/manager-dao/src/test/java/org/apache/inlong/manager/dao/RestTemplateConfig.java b/inlong-manager/manager-dao/src/test/java/org/apache/inlong/manager/dao/RestTemplateConfig.java
index 4a5e7b804..3cbb57840 100644
--- a/inlong-manager/manager-dao/src/test/java/org/apache/inlong/manager/dao/RestTemplateConfig.java
+++ b/inlong-manager/manager-dao/src/test/java/org/apache/inlong/manager/dao/RestTemplateConfig.java
@@ -29,6 +29,7 @@ import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
 import org.apache.http.impl.client.HttpClientBuilder;
 import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
 import org.apache.http.protocol.HttpContext;
+import org.apache.inlong.common.constant.ProtocolType;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
@@ -79,8 +80,8 @@ public class RestTemplateConfig {
     public PoolingHttpClientConnectionManager httpClientConnectionManager() {
         // Support HTTP, HTTPS
         Registry<ConnectionSocketFactory> registry = RegistryBuilder.<ConnectionSocketFactory>create()
-                .register("http", PlainConnectionSocketFactory.getSocketFactory())
-                .register("https", SSLConnectionSocketFactory.getSocketFactory())
+                .register(ProtocolType.HTTP, PlainConnectionSocketFactory.getSocketFactory())
+                .register(ProtocolType.HTTPS, SSLConnectionSocketFactory.getSocketFactory())
                 .build();
         PoolingHttpClientConnectionManager httpClientConnectionManager = new PoolingHttpClientConnectionManager(
                 registry);
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/TaskRunService.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/TaskRunService.java
index 51109e2c7..bbc9c64fd 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/TaskRunService.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/TaskRunService.java
@@ -17,71 +17,61 @@
 
 package org.apache.inlong.manager.plugin.flink;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.inlong.manager.common.consts.InlongConstants.ALIVE_TIME_MS;
+import static org.apache.inlong.manager.common.consts.InlongConstants.CORE_POOL_SIZE;
+import static org.apache.inlong.manager.common.consts.InlongConstants.MAX_POOL_SIZE;
+import static org.apache.inlong.manager.common.consts.InlongConstants.QUEUE_SIZE;
+
 /**
  * Task run service.
  */
 public class TaskRunService {
 
-    private static final ExecutorService executorService;
-
-    private static final int CORE_POOL_SIZE = 16;
-    private static final int MAXIMUM_POOL_SIZE = 32;
-    private static final int QUEUE_SIZE = 10000;
-    private static final long KEEP_ALIVE_TIME = 0L;
-
-    static {
-        executorService = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE,
-                KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
-                new LinkedBlockingQueue<Runnable>(QUEUE_SIZE));
-    }
+    private static final ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor(
+            CORE_POOL_SIZE,
+            MAX_POOL_SIZE,
+            ALIVE_TIME_MS,
+            TimeUnit.MILLISECONDS,
+            new LinkedBlockingQueue<>(QUEUE_SIZE),
+            new ThreadFactoryBuilder().setNameFormat("inlong-plugin-%s").build(),
+            new CallerRunsPolicy());
 
     /**
      * execute
-     *
-     * @param runnable
      */
     public static void execute(Runnable runnable) {
-        executorService.execute(runnable);
+        EXECUTOR_SERVICE.execute(runnable);
     }
 
     /**
      * submit
-     *
-     * @param runnable
-     * @return
      */
     public static Future<?> submit(Runnable runnable) {
-        return executorService.submit(runnable);
+        return EXECUTOR_SERVICE.submit(runnable);
     }
 
     /**
      * submit
-     *
-     * @param runnable
-     * @param defaultValue
-     * @param <T>
-     * @return
      */
     public static <T> Future<T> submit(Runnable runnable, T defaultValue) {
-        return executorService.submit(runnable, defaultValue);
+        return EXECUTOR_SERVICE.submit(runnable, defaultValue);
     }
 
     /**
      * submit
-     *
-     * @param callable
-     * @param <T>
-     * @return
      */
     public static <T> Future<T> submit(Callable<T> callable) {
-        return executorService.submit(callable);
+        return EXECUTOR_SERVICE.submit(callable);
     }
 
 }
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
index 6e65f30dc..7eae0e9ca 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
@@ -42,7 +42,7 @@ import java.util.Map;
 import static org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStackMsg;
 
 /**
- * Listener of delete sort.
+ * Listener of delete Sort task or config.
  */
 @Slf4j
 public class DeleteSortListener implements SortOperateListener {
@@ -57,17 +57,17 @@ public class DeleteSortListener implements SortOperateListener {
         ProcessForm processForm = context.getProcessForm();
         String groupId = processForm.getInlongGroupId();
         if (!(processForm instanceof GroupResourceProcessForm)) {
-            log.info("not add delete group listener, not GroupResourceProcessForm for groupId [{}]", groupId);
+            log.info("not add delete group listener, not GroupResourceProcessForm for groupId={}", groupId);
             return false;
         }
 
         GroupResourceProcessForm groupProcessForm = (GroupResourceProcessForm) processForm;
         if (groupProcessForm.getGroupOperateType() != GroupOperateType.DELETE) {
-            log.info("not add delete group listener, as the operate was not DELETE for groupId [{}]", groupId);
+            log.info("not add delete group listener, as the operate was not DELETE for groupId={}", groupId);
             return false;
         }
 
-        log.info("add delete group listener for groupId [{}]", groupId);
+        log.info("add delete group listener for groupId={}", groupId);
         return true;
     }
 
@@ -76,7 +76,7 @@ public class DeleteSortListener implements SortOperateListener {
         ProcessForm processForm = context.getProcessForm();
         String groupId = processForm.getInlongGroupId();
         if (!(processForm instanceof GroupResourceProcessForm)) {
-            String message = String.format("process form was not GroupResourceProcessForm for groupId [%s]", groupId);
+            String message = String.format("process form was not GroupResourceProcessForm for groupId=%s", groupId);
             log.error(message);
             return ListenerResult.fail(message);
         }
@@ -90,10 +90,8 @@ public class DeleteSortListener implements SortOperateListener {
         extList.forEach(groupExtInfo -> kvConf.put(groupExtInfo.getKeyName(), groupExtInfo.getKeyValue()));
         String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
         if (StringUtils.isEmpty(sortExt)) {
-            String message = String.format("delete sort failed for groupId [%s], as the sort properties is empty",
-                    groupId);
-            log.error(message);
-            return ListenerResult.fail(message);
+            log.warn("no need to delete sort for groupId={}, as the sort properties is empty", groupId);
+            return ListenerResult.success();
         }
 
         Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue(
@@ -102,7 +100,7 @@ public class DeleteSortListener implements SortOperateListener {
         kvConf.putAll(result);
         String jobId = kvConf.get(InlongConstants.SORT_JOB_ID);
         if (StringUtils.isBlank(jobId)) {
-            String message = String.format("sort job id is empty for groupId [%s]", groupId);
+            String message = String.format("sort job id is empty for groupId=%s", groupId);
             return ListenerResult.fail(message);
         }
 
@@ -115,16 +113,16 @@ public class DeleteSortListener implements SortOperateListener {
         FlinkOperation flinkOperation = new FlinkOperation(flinkService);
         try {
             flinkOperation.delete(flinkInfo);
-            log.info("job delete success for [{}]", jobId);
+            log.info("job delete success for jobId={}", jobId);
             return ListenerResult.success();
         } catch (Exception e) {
             flinkInfo.setException(true);
             flinkInfo.setExceptionMsg(getExceptionStackMsg(e));
             flinkOperation.pollJobStatus(flinkInfo);
 
-            String message = String.format("delete sort failed for groupId [%s] ", groupId);
+            String message = String.format("delete sort failed for groupId=%s", groupId);
             log.error(message, e);
-            return ListenerResult.fail(message + e.getMessage());
+            return ListenerResult.fail(message + ": " + e.getMessage());
         }
     }
 
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java
index 7c50d9738..64006410b 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java
@@ -59,19 +59,19 @@ public class DeleteStreamListener implements SortOperateListener {
         ProcessForm processForm = context.getProcessForm();
         String groupId = processForm.getInlongGroupId();
         if (!(processForm instanceof StreamResourceProcessForm)) {
-            log.info("not add delete stream listener, not StreamResourceProcessForm for groupId [{}]", groupId);
+            log.info("not add delete stream listener, not StreamResourceProcessForm for groupId={}", groupId);
             return false;
         }
 
         StreamResourceProcessForm streamProcessForm = (StreamResourceProcessForm) processForm;
         String streamId = streamProcessForm.getStreamInfo().getInlongStreamId();
         if (streamProcessForm.getGroupOperateType() != GroupOperateType.DELETE) {
-            log.info("not add delete stream listener, as the operate was not DELETE for groupId [{}] streamId [{}]",
+            log.info("not add delete stream listener, as the operate was not DELETE for groupId={} streamId={}",
                     groupId, streamId);
             return false;
         }
 
-        log.info("add delete stream listener for groupId [{}] streamId [{}]", groupId, streamId);
+        log.info("add delete stream listener for groupId={} streamId={}", groupId, streamId);
         return true;
     }
 
@@ -81,26 +81,23 @@ public class DeleteStreamListener implements SortOperateListener {
         StreamResourceProcessForm streamResourceProcessForm = (StreamResourceProcessForm) processForm;
         InlongGroupInfo groupInfo = streamResourceProcessForm.getGroupInfo();
         List<InlongGroupExtInfo> groupExtList = groupInfo.getExtList();
-        log.info("inlong group :{} ext info: {}", groupInfo.getInlongGroupId(), groupExtList);
+        log.info("inlong group: {} ext info: {}", groupInfo.getInlongGroupId(), groupExtList);
+
         InlongStreamInfo streamInfo = streamResourceProcessForm.getStreamInfo();
         List<InlongStreamExtInfo> streamExtList = streamInfo.getExtList();
-        log.info("inlong stream :{} ext info: {}", streamInfo.getInlongStreamId(), streamExtList);
+        log.info("inlong stream: {} ext info: {}", streamInfo.getInlongStreamId(), streamExtList);
 
         Map<String, String> kvConf = new HashMap<>();
         groupExtList.forEach(groupExtInfo -> kvConf.put(groupExtInfo.getKeyName(), groupExtInfo.getKeyValue()));
-        streamExtList.forEach(extInfo -> {
-            kvConf.put(extInfo.getKeyName(), extInfo.getKeyValue());
-        });
+        streamExtList.forEach(extInfo -> kvConf.put(extInfo.getKeyName(), extInfo.getKeyValue()));
 
         final String groupId = streamInfo.getInlongGroupId();
         final String streamId = streamInfo.getInlongStreamId();
         String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
         if (StringUtils.isEmpty(sortExt)) {
-            String message = String.format(
-                    "delete sort failed for groupId [%s] and streamId [%s], as the sort properties is empty",
+            log.warn("no need to delete sort for groupId={} streamId={}, as the sort properties is empty",
                     groupId, streamId);
-            log.error(message);
-            return ListenerResult.fail(message);
+            return ListenerResult.success();
         }
 
         Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue(
@@ -109,7 +106,7 @@ public class DeleteStreamListener implements SortOperateListener {
         kvConf.putAll(result);
         String jobId = kvConf.get(InlongConstants.SORT_JOB_ID);
         if (StringUtils.isBlank(jobId)) {
-            String message = String.format("sort job id is empty for groupId [%s] streamId [%s]", groupId, streamId);
+            String message = String.format("sort job id is empty for groupId=%s streamId=%s", groupId, streamId);
             return ListenerResult.fail(message);
         }
 
@@ -122,14 +119,14 @@ public class DeleteStreamListener implements SortOperateListener {
         FlinkOperation flinkOperation = new FlinkOperation(flinkService);
         try {
             flinkOperation.delete(flinkInfo);
-            log.info("job delete success for [{}]", jobId);
+            log.info("job delete success for jobId={}", jobId);
             return ListenerResult.success();
         } catch (Exception e) {
             flinkInfo.setException(true);
             flinkInfo.setExceptionMsg(getExceptionStackMsg(e));
             flinkOperation.pollJobStatus(flinkInfo);
 
-            String message = String.format("delete sort failed for groupId [%s] streamId [%s]", groupId, streamId);
+            String message = String.format("delete sort failed for groupId=%s streamId=%s", groupId, streamId);
             log.error(message, e);
             return ListenerResult.fail(message + e.getMessage());
         }
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java
index 25052df49..c3daefecf 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java
@@ -101,7 +101,7 @@ public class FlinkUtils {
 
         File baseDir = new File(baseDirName);
         if (!baseDir.exists() || !baseDir.isDirectory()) {
-            log.error("baseDirName find fail :{}", baseDirName);
+            log.error("baseDirName find fail: {}", baseDirName);
             return result;
         }
         String tempName;
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/mysql/MySQLSinkDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/mysql/MySQLSinkDTO.java
index 21604f777..6363a8ec1 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/mysql/MySQLSinkDTO.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/mysql/MySQLSinkDTO.java
@@ -145,26 +145,25 @@ public class MySQLSinkDTO {
         }
 
         String connUri = jdbcUrl.substring(pos1 + 1);
-        int pos;
         if (connUri.startsWith("//")) {
-            if ((pos = connUri.indexOf('/', 2)) != -1) {
+            int pos = connUri.indexOf('/', 2);
+            if (pos != -1) {
                 database = connUri.substring(pos + 1);
             }
         } else {
             database = connUri;
         }
 
+        if (Strings.isNullOrEmpty(database)) {
+            throw new IllegalArgumentException("Invalid JDBC URL: " + jdbcUrl);
+        }
+
         if (database.contains("?")) {
             database = database.substring(0, database.indexOf("?"));
         }
-
         if (database.contains(";")) {
             database = database.substring(0, database.indexOf(";"));
         }
-
-        if (Strings.isNullOrEmpty(database)) {
-            throw new IllegalArgumentException("Invalid JDBC url.");
-        }
         return database;
     }
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortConfigLoader.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortConfigLoader.java
index 6b0a02fc5..7f5469aa6 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortConfigLoader.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortConfigLoader.java
@@ -34,64 +34,76 @@ import java.util.List;
  * Loader for sort service to load configs thought Cursor
  */
 public interface SortConfigLoader {
+
     /**
      * Load all clusters by cursor
-     * @return List of clusters, including MQ cluster and DataNode cluster.
+     *
+     * @return list of clusters, including MQ cluster and DataProxy cluster
      */
     List<SortSourceClusterInfo> loadAllClusters();
 
     /**
      * Load stream sinks by cursor
-     * @return List of Stream sinks.
+     *
+     * @return list of stream sinks
      */
     List<SortSourceStreamSinkInfo> loadAllStreamSinks();
 
     /**
      * Load groups by cursor
-     * @return List of group info
+     *
+     * @return list of group info
      */
     List<SortSourceGroupInfo> loadAllGroup();
 
     /**
-     * Load group backup info by cursor
-     * @param keyName Key name
-     * @return List of group backup info
+     * Load backup group info by cursor
+     *
+     * # @param keyName key name
+     *
+     * @return list of backup group info
      */
     List<InlongGroupExtEntity> loadGroupBackupInfo(String keyName);
 
     /**
-     * Load stream backup info by cursor
-     * @param keyName Key name
-     * @return List of stream backup info
+     * Load backup stream info by cursor
+     *
+     * @param keyName key name
+     * @return list of backup stream info
      */
     List<InlongStreamExtEntity> loadStreamBackupInfo(String keyName);
 
     /**
      * Load all inlong stream info by cursor
-     * @return List of stream info
+     *
+     * @return list of stream info
      */
     List<SortSourceStreamInfo> loadAllStreams();
 
     /**
      * Load all inlong stream sink entity by cursor
+     *
      * @return List of stream sink entity
      */
     List<StreamSinkEntity> loadAllStreamSinkEntity();
 
     /**
      * Load all task info
+     *
      * @return List of tasks
      */
     List<SortTaskInfo> loadAllTask();
 
     /**
      * Load all data node entity
+     *
      * @return List of data node
      */
     List<DataNodeEntity> loadAllDataNodeEntity();
 
     /**
      * Load all fields info
+     *
      * @return List of fields info
      */
     List<SortFieldInfo> loadAllFields();
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AbstractService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AbstractService.java
deleted file mode 100644
index 3b819f898..000000000
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AbstractService.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.core.impl;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import org.apache.commons.collections.CollectionUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.InitializingBean;
-import org.springframework.beans.factory.annotation.Value;
-
-public abstract class AbstractService<T> implements AutoCloseable, InitializingBean {
-
-    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractService.class);
-
-
-    @Value("${msg.to.db.batch.size:10}")
-    private int batchSize = 10;
-
-    @Value("${msg.to.db.queue.size:10000}")
-    private int queueSize = 10000;
-
-    @Value("${msg.to.db.core.pool.size:2}")
-    private int corePoolSize = 2;
-
-    @Value("${msg.to.db.max.pool.size:2}")
-    private int maximumPoolSize = 2;
-
-    @Value("${msg.to.db.queue.pool.size:10}")
-    private int syncSendQueueSize = 10;
-
-    private volatile boolean isClose = false;
-
-    private LinkedBlockingQueue<T> dataQueue;
-
-    private ThreadPoolExecutor pool;
-
-    /**
-     * batch insert entities
-     *
-     * @param entryList entryList
-     * @return boolean true/false
-     */
-    abstract boolean batchInsertEntities(List<T> entryList);
-
-    /**
-     * put Data
-     *
-     * @param data data
-     * @return boolean true/false
-     */
-    public boolean putData(T data) {
-        if (dataQueue == null) {
-            return false;
-        }
-        return dataQueue.offer(data);
-    }
-
-    @Override
-    public void close() {
-        isClose = true;
-    }
-
-    @Override
-    public void afterPropertiesSet() {
-        dataQueue = new LinkedBlockingQueue(queueSize);
-        pool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize,
-                60, TimeUnit.SECONDS, new LinkedBlockingDeque<>(syncSendQueueSize),
-                Executors.defaultThreadFactory());
-        for (int i = 0; i < corePoolSize; i++) {
-            pool.execute(new Task());
-        }
-    }
-
-    class Task implements Runnable {
-        @Override
-        public void run() {
-            while (!isClose) {
-                try {
-                    List<T> entryList = new ArrayList<>();
-                    int count = 0;
-                    while (count < batchSize) {
-                        T data = dataQueue.poll(1, TimeUnit.MILLISECONDS);
-                        if (data != null) {
-                            entryList.add(data);
-                        }
-                        count++;
-                    }
-                    if (CollectionUtils.isNotEmpty(entryList)) {
-                        batchInsertEntities(entryList);
-                    }
-                } catch (Exception e) {
-                    LOGGER.error("batchInsertEntities has exception = {}", e);
-                }
-            }
-        }
-    }
-}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupProcessService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupProcessService.java
index 8d950b706..de044f0aa 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupProcessService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupProcessService.java
@@ -48,6 +48,11 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.inlong.manager.common.consts.InlongConstants.ALIVE_TIME_MS;
+import static org.apache.inlong.manager.common.consts.InlongConstants.CORE_POOL_SIZE;
+import static org.apache.inlong.manager.common.consts.InlongConstants.MAX_POOL_SIZE;
+import static org.apache.inlong.manager.common.consts.InlongConstants.QUEUE_SIZE;
+
 /**
  * Operation to the inlong group process
  */
@@ -56,12 +61,12 @@ public class InlongGroupProcessService {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(InlongGroupProcessService.class);
 
-    private final ExecutorService executorService = new ThreadPoolExecutor(
-            20,
-            40,
-            0L,
+    private static final ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor(
+            CORE_POOL_SIZE,
+            MAX_POOL_SIZE,
+            ALIVE_TIME_MS,
             TimeUnit.MILLISECONDS,
-            new LinkedBlockingQueue<>(),
+            new LinkedBlockingQueue<>(QUEUE_SIZE),
             new ThreadFactoryBuilder().setNameFormat("inlong-group-process-%s").build(),
             new CallerRunsPolicy());
 
@@ -107,7 +112,7 @@ public class InlongGroupProcessService {
         groupService.updateStatus(groupId, GroupStatus.SUSPENDING.getCode(), operator);
         InlongGroupInfo groupInfo = groupService.get(groupId);
         GroupResourceProcessForm form = genGroupResourceProcessForm(groupInfo, GroupOperateType.SUSPEND);
-        executorService.execute(() -> workflowService.start(ProcessName.SUSPEND_GROUP_PROCESS, operator, form));
+        EXECUTOR_SERVICE.execute(() -> workflowService.start(ProcessName.SUSPEND_GROUP_PROCESS, operator, form));
 
         LOGGER.info("success to suspend process asynchronously for groupId={} by operator={}", groupId, operator);
         return groupId;
@@ -147,7 +152,7 @@ public class InlongGroupProcessService {
         groupService.updateStatus(groupId, GroupStatus.RESTARTING.getCode(), operator);
         InlongGroupInfo groupInfo = groupService.get(groupId);
         GroupResourceProcessForm form = genGroupResourceProcessForm(groupInfo, GroupOperateType.RESTART);
-        executorService.execute(() -> workflowService.start(ProcessName.RESTART_GROUP_PROCESS, operator, form));
+        EXECUTOR_SERVICE.execute(() -> workflowService.start(ProcessName.RESTART_GROUP_PROCESS, operator, form));
 
         LOGGER.info("success to restart process asynchronously for groupId={} by operator={}", groupId, operator);
         return groupId;
@@ -181,7 +186,7 @@ public class InlongGroupProcessService {
      */
     public String deleteProcessAsync(String groupId, String operator) {
         LOGGER.info("begin to delete process asynchronously for groupId={} by operator={}", groupId, operator);
-        executorService.execute(() -> {
+        EXECUTOR_SERVICE.execute(() -> {
             try {
                 invokeDeleteProcess(groupId, operator);
             } catch (Exception ex) {
@@ -255,7 +260,7 @@ public class InlongGroupProcessService {
             List<WorkflowProcessEntity> entities = workflowQueryService.listProcessEntity(processQuery);
             entities.sort(Comparator.comparingInt(WorkflowProcessEntity::getId));
             WorkflowProcessEntity lastProcess = entities.get(entities.size() - 1);
-            executorService.execute(() -> {
+            EXECUTOR_SERVICE.execute(() -> {
                 workflowService.continueProcess(lastProcess.getId(), operator, "Reset group status");
             });
             return true;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java
index 010afee02..16a96a8f1 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java
@@ -79,7 +79,7 @@ public class HeartbeatManager implements AbstractHeartbeatManager {
                     }
                 }).build();
 
-        // The expire time of cluster info cache must be greater than heartbeat cache
+        // The expiry time of cluster info cache must be greater than heartbeat cache
         // because the eviction handler needs to query cluster info cache
         clusterInfoCache = Caffeine.newBuilder()
                 .expireAfterAccess(expireTime * 2L, TimeUnit.SECONDS)
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java
index 390c11b48..152832ba1 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java
@@ -48,6 +48,10 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.inlong.manager.common.consts.InlongConstants.ALIVE_TIME_MS;
+import static org.apache.inlong.manager.common.consts.InlongConstants.CORE_POOL_SIZE;
+import static org.apache.inlong.manager.common.consts.InlongConstants.MAX_POOL_SIZE;
+import static org.apache.inlong.manager.common.consts.InlongConstants.QUEUE_SIZE;
 import static org.apache.inlong.manager.common.enums.GroupOperateType.INIT;
 import static org.apache.inlong.manager.common.enums.ProcessName.CREATE_STREAM_RESOURCE;
 
@@ -59,21 +63,23 @@ import static org.apache.inlong.manager.common.enums.ProcessName.CREATE_STREAM_R
 @Service
 public class QueueResourceListener implements QueueOperateListener {
 
+    private static final Integer TIMEOUT_SECONDS = 180;
+
     private static final ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor(
-            20,
-            40,
-            10L,
-            TimeUnit.SECONDS,
-            new LinkedBlockingQueue<>(),
-            new ThreadFactoryBuilder().setNameFormat("inlong-stream-process-%s").build(),
+            CORE_POOL_SIZE,
+            MAX_POOL_SIZE,
+            ALIVE_TIME_MS,
+            TimeUnit.MILLISECONDS,
+            new LinkedBlockingQueue<>(QUEUE_SIZE),
+            new ThreadFactoryBuilder().setNameFormat("inlong-mq-process-%s").build(),
             new CallerRunsPolicy());
 
     @Autowired
     private InlongGroupService groupService;
     @Autowired
-    private QueueResourceOperatorFactory queueOperatorFactory;
-    @Autowired
     private WorkflowService workflowService;
+    @Autowired
+    private QueueResourceOperatorFactory queueOperatorFactory;
 
     @Override
     public TaskEvent event() {
@@ -152,7 +158,9 @@ public class QueueResourceListener implements QueueOperateListener {
                         }
                     });
             try {
-                future.get(180, TimeUnit.SECONDS);
+                // wait for the current process complete before starting the next stream,
+                // otherwise, an exception is thrown and the next stream process will not be started.
+                future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
             } catch (Exception e) {
                 String msg = "failed to execute stream process in asynchronously ";
                 log.error(msg, e);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java
index 08a310ddb..39a3b78dc 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java
@@ -77,15 +77,17 @@ public class SortConfigListener implements SortOperateListener {
 
         GroupOperateType operateType = form.getGroupOperateType();
         if (operateType == GroupOperateType.SUSPEND || operateType == GroupOperateType.DELETE) {
-            LOGGER.info("not build sort config for groupId={}, as the group operate type={}", groupId, operateType);
+            LOGGER.info("no need to build sort config for groupId={} as the operate type is {}", groupId, operateType);
             return ListenerResult.success();
         }
+
         InlongGroupInfo groupInfo = form.getGroupInfo();
         List<InlongStreamInfo> streamInfos = form.getStreamInfos();
         if (CollectionUtils.isEmpty(streamInfos)) {
-            LOGGER.warn("do not build sort config for groupId={}, as the stream is empty", groupId);
+            LOGGER.warn("no need to build sort config for groupId={}, as not found any stream", groupId);
             return ListenerResult.success();
         }
+
         int sinkCount = streamInfos.stream()
                 .map(stream -> stream.getSinkList() == null ? 0 : stream.getSinkList().size())
                 .reduce(0, Integer::sum);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/operationlog/OperationLogPool.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/operationlog/OperationLogPool.java
index 85b8e7eb7..cf7bd4828 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/operationlog/OperationLogPool.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/operationlog/OperationLogPool.java
@@ -36,6 +36,9 @@ import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.IntStream;
 
+import static org.apache.inlong.manager.common.consts.InlongConstants.ALIVE_TIME_MS;
+import static org.apache.inlong.manager.common.consts.InlongConstants.QUEUE_SIZE;
+
 /**
  * Operation log thread pool
  */
@@ -43,24 +46,23 @@ import java.util.stream.IntStream;
 @Component
 public class OperationLogPool {
 
-    private static final int BUFFER_SIZE = 500;
-    private static final int MAX_WAIT_TIME_SECOND = 30;
-    private static final int MAX_QUEUE_SIZE = 10000;
+    private static final int TIMEOUT_SECOND = 30;
     private static final int THREAD_NUM = 3;
-    private static final ArrayBlockingQueue<OperationLogEntity> OPERATION_POOL = new ArrayBlockingQueue<>(
-            MAX_QUEUE_SIZE);
+    private static final int BUFFER_SIZE = 500;
+
+    private static final ArrayBlockingQueue<OperationLogEntity> OPERATION_POOL = new ArrayBlockingQueue<>(QUEUE_SIZE);
 
-    private final ExecutorService executorService = new ThreadPoolExecutor(
+    private static final ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor(
             THREAD_NUM,
             THREAD_NUM,
-            0L,
+            ALIVE_TIME_MS,
             TimeUnit.MILLISECONDS,
-            new LinkedBlockingQueue<>(),
-            new ThreadFactoryBuilder().setNameFormat("async-operation-log-%s").build(),
+            new LinkedBlockingQueue<>(QUEUE_SIZE),
+            new ThreadFactoryBuilder().setNameFormat("inlong-operation-log-%s").build(),
             new CallerRunsPolicy());
 
     @Autowired
-    private OperationLogEntityMapper operationLogEntityMapper;
+    private OperationLogEntityMapper operationLogMapper;
 
     public static void publish(OperationLogEntity operation) {
         if (!OPERATION_POOL.offer(operation)) {
@@ -71,7 +73,7 @@ public class OperationLogPool {
     @PostConstruct
     public void init() {
         IntStream.range(0, THREAD_NUM).forEach(
-                i -> executorService.submit(this::saveOperationLog)
+                i -> EXECUTOR_SERVICE.submit(this::saveOperationLog)
         );
     }
 
@@ -79,14 +81,13 @@ public class OperationLogPool {
         List<OperationLogEntity> buffer = new ArrayList<>(BUFFER_SIZE);
         while (true) {
             buffer.clear();
-            int size = 0;
             try {
-                size = Queues.drain(OPERATION_POOL, buffer, BUFFER_SIZE, MAX_WAIT_TIME_SECOND, TimeUnit.SECONDS);
+                int size = Queues.drain(OPERATION_POOL, buffer, BUFFER_SIZE, TIMEOUT_SECOND, TimeUnit.SECONDS);
                 if (buffer.isEmpty()) {
                     continue;
                 }
                 long startTime = System.currentTimeMillis();
-                operationLogEntityMapper.insertBatch(buffer);
+                operationLogMapper.insertBatch(buffer);
                 log.info("receive {} logs and saved cost {} ms", size, System.currentTimeMillis() - startTime);
             } catch (InterruptedException e) {
                 log.error("save operation log interrupted", e);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java
index 2f65aa735..2157acb1d 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java
@@ -51,14 +51,14 @@ public class KafkaResourceOperators implements QueueResourceOperator {
      */
     private static final String KAFKA_CONSUMER_GROUP = "%s_%s_consumer_group";
 
-    @Autowired
-    private InlongClusterService clusterService;
     @Autowired
     private KafkaOperator kafkaOperator;
     @Autowired
     private InlongStreamService streamService;
     @Autowired
     private InlongConsumeService consumeService;
+    @Autowired
+    private InlongClusterService clusterService;
 
     @Override
     public boolean accept(String mqType) {
@@ -67,7 +67,8 @@ public class KafkaResourceOperators implements QueueResourceOperator {
 
     @Override
     public void createQueueForGroup(@NotNull InlongGroupInfo groupInfo, @NotBlank String operator) {
-        log.info("skip to create kafka topic for groupId={}", groupInfo.getInlongGroupId());
+        log.info("skip to create kafka topic for groupId={}, just create in each inlong stream",
+                groupInfo.getInlongGroupId());
     }
 
     @Override
@@ -84,7 +85,7 @@ public class KafkaResourceOperators implements QueueResourceOperator {
                 return;
             }
             for (InlongStreamBriefInfo streamInfo : streamInfoList) {
-                this.deleteKafkaTopic(groupInfo, streamInfo.getInlongStreamId());
+                this.deleteKafkaTopic(groupInfo, streamInfo.getMqResource());
             }
         } catch (Exception e) {
             log.error("failed to delete kafka resource for groupId=" + groupId, e);
@@ -94,8 +95,7 @@ public class KafkaResourceOperators implements QueueResourceOperator {
     }
 
     @Override
-    public void createQueueForStream(InlongGroupInfo groupInfo, InlongStreamInfo streamInfo,
-            String operator) {
+    public void createQueueForStream(InlongGroupInfo groupInfo, InlongStreamInfo streamInfo, String operator) {
         Preconditions.checkNotNull(groupInfo, "inlong group info cannot be null");
         Preconditions.checkNotNull(streamInfo, "inlong stream info cannot be null");
         Preconditions.checkNotNull(operator, "operator cannot be null");
@@ -117,8 +117,7 @@ public class KafkaResourceOperators implements QueueResourceOperator {
     }
 
     @Override
-    public void deleteQueueForStream(InlongGroupInfo groupInfo, InlongStreamInfo streamInfo,
-            String operator) {
+    public void deleteQueueForStream(InlongGroupInfo groupInfo, InlongStreamInfo streamInfo, String operator) {
         Preconditions.checkNotNull(groupInfo, "inlong group info cannot be null");
         Preconditions.checkNotNull(streamInfo, "inlong stream info cannot be null");
 
@@ -141,7 +140,7 @@ public class KafkaResourceOperators implements QueueResourceOperator {
      * Create Kafka Topic and Subscription, and save the consumer group info.
      */
     private void createKafkaTopic(InlongKafkaInfo kafkaInfo, String topicName) throws Exception {
-        // 1. create kafka topic
+        // create Kafka topic
         ClusterInfo clusterInfo = clusterService.getOne(kafkaInfo.getInlongClusterTag(), null, ClusterType.KAFKA);
         kafkaOperator.createTopic(kafkaInfo, (KafkaClusterInfo) clusterInfo, topicName);
 
@@ -153,7 +152,8 @@ public class KafkaResourceOperators implements QueueResourceOperator {
         }
 
         // Kafka consumers do not need to register in advance
-        // 2. insert the consumer group info
+
+        // save the consumer group info for the Kafka topic
         String consumeGroup = String.format(KAFKA_CONSUMER_GROUP, kafkaInfo.getInlongClusterTag(), topicName);
         Integer id = consumeService.saveBySystem(kafkaInfo, topicName, consumeGroup);
         log.info("success to save inlong consume [{}] for consumerGroup={}, groupId={}, topic={}",
@@ -163,9 +163,8 @@ public class KafkaResourceOperators implements QueueResourceOperator {
     /**
      * Delete Kafka Topic and Subscription, and delete the consumer group info.
      */
-    private void deleteKafkaTopic(InlongGroupInfo groupInfo, String streamId) {
+    private void deleteKafkaTopic(InlongGroupInfo groupInfo, String topicName) {
         ClusterInfo clusterInfo = clusterService.getOne(groupInfo.getInlongClusterTag(), null, ClusterType.KAFKA);
-        String topicName = groupInfo.getInlongGroupId() + "_" + streamId;
         kafkaOperator.forceDeleteTopic((KafkaClusterInfo) clusterInfo, topicName);
     }
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
index 3ebeeca37..b82074d74 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
@@ -353,7 +353,7 @@ public class PulsarOperator {
                 LOGGER.error("check if the subscription exists for topic={} error, continue retry", topic, e);
                 if (count == RETRY_TIMES) {
                     LOGGER.error("after {} times retry, still check subscription exception for topic {}", count, topic);
-                    throw new BusinessException("check if the subscription exists error");
+                    throw new BusinessException("check if the subscription exists error: " + e.getMessage());
                 }
             }
         }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java
index 2e379b285..69a38ec6e 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java
@@ -27,6 +27,7 @@ import org.apache.inlong.manager.common.enums.ClusterType;
 import org.apache.inlong.manager.common.enums.GroupStatus;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
 import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
 import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarInfo;
@@ -44,7 +45,6 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
 import java.util.List;
-import java.util.stream.Collectors;
 
 /**
  * Operator for create Pulsar Tenant, Namespace, Topic and Subscription
@@ -80,72 +80,67 @@ public class PulsarResourceOperator implements QueueResourceOperator {
         Preconditions.checkNotNull(operator, "operator cannot be null");
 
         String groupId = groupInfo.getInlongGroupId();
-        log.info("begin to create pulsar resource for groupId={}", groupId);
+        String clusterTag = groupInfo.getInlongClusterTag();
+        log.info("begin to create pulsar resource for groupId={}, clusterTag={}", groupId, clusterTag);
 
         // get pulsar cluster via the inlong cluster tag from the inlong group
-        String clusterTag = groupInfo.getInlongClusterTag();
-        List<PulsarClusterInfo> pulsarClusters =
-                clusterService.listByTagAndType(clusterTag, ClusterType.PULSAR).stream()
-                        .map(clusterInfo -> (PulsarClusterInfo) clusterInfo)
-                        .collect(Collectors.toList());
-        for (PulsarClusterInfo pulsarCluster : pulsarClusters) {
+        List<ClusterInfo> clusterInfos = clusterService.listByTagAndType(clusterTag, ClusterType.PULSAR);
+        for (ClusterInfo clusterInfo : clusterInfos) {
+            PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfo;
             try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarCluster)) {
-                String clusterName = pulsarCluster.getName();
                 // create pulsar tenant and namespace
                 String tenant = pulsarCluster.getTenant();
                 if (StringUtils.isEmpty(tenant)) {
                     tenant = InlongConstants.DEFAULT_PULSAR_TENANT;
                 }
-                String namespace = groupInfo.getMqResource();
-                InlongPulsarInfo pulsarInfo = (InlongPulsarInfo) groupInfo;
+
                 // if the group was not successful, need create tenant and namespace
                 if (!Objects.equal(GroupStatus.CONFIG_SUCCESSFUL.getCode(), groupInfo.getStatus())) {
                     pulsarOperator.createTenant(pulsarAdmin, tenant);
-                    log.info("success to create pulsar tenant for groupId={}, tenant={}, cluster={}",
-                            groupId, tenant, clusterName);
-                    pulsarOperator.createNamespace(pulsarAdmin, pulsarInfo, tenant, namespace);
-                    log.info("success to create pulsar namespace for groupId={}, namespace={}, cluster={}",
-                            groupId, namespace, clusterName);
+                    String namespace = groupInfo.getMqResource();
+                    pulsarOperator.createNamespace(pulsarAdmin, (InlongPulsarInfo) groupInfo, tenant, namespace);
+
+                    log.info("success to create pulsar resource for groupId={}, tenant={}, namespace={}, cluster={}",
+                            groupId, tenant, namespace, pulsarCluster);
                 }
             } catch (Exception e) {
-                String msg = String.format("failed to create pulsar resource for groupId=%s, cluster=%s", groupId,
-                        pulsarCluster.toString());
-                log.error(msg, e);
+                String msg = "failed to create pulsar resource for groupId=" + groupId;
+                log.error(msg + ", cluster=" + pulsarCluster, e);
                 throw new WorkflowListenerException(msg + ": " + e.getMessage());
             }
         }
-        log.info("success to create pulsar resource for groupId={}", groupId);
+
+        log.info("success to create pulsar resource for groupId={}, clusterTag={}", groupId, clusterTag);
     }
 
     @Override
     public void deleteQueueForGroup(InlongGroupInfo groupInfo, String operator) {
         Preconditions.checkNotNull(groupInfo, "inlong group info cannot be null");
-
         String groupId = groupInfo.getInlongGroupId();
-        log.info("begin to delete pulsar resource for groupId={}", groupId);
-
-        List<PulsarClusterInfo> pulsarClusters =
-                clusterService.listByTagAndType(groupInfo.getInlongClusterTag(), ClusterType.PULSAR).stream()
-                        .map(clusterInfo -> (PulsarClusterInfo) clusterInfo)
-                        .collect(Collectors.toList());
-        for (PulsarClusterInfo clusterInfo : pulsarClusters) {
-            String clusterName = clusterInfo.getName();
+        String clusterTag = groupInfo.getInlongClusterTag();
+        log.info("begin to delete pulsar resource for groupId={}, clusterTag={}", groupId, clusterTag);
+
+        List<InlongStreamBriefInfo> streamInfos = streamService.getTopicList(groupId);
+        if (CollectionUtils.isEmpty(streamInfos)) {
+            log.warn("skip to delete pulsar resource as no streams for groupId={}", groupId);
+            return;
+        }
+
+        List<ClusterInfo> clusterInfos = clusterService.listByTagAndType(clusterTag, ClusterType.PULSAR);
+        for (ClusterInfo clusterInfo : clusterInfos) {
+            PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfo;
             try {
-                List<InlongStreamBriefInfo> streamInfoList = streamService.getTopicList(groupId);
-                if (streamInfoList == null || streamInfoList.isEmpty()) {
-                    log.warn("skip to create pulsar topic and subscription as no streams for groupId={}, cluster={}",
-                            groupId, clusterName);
-                    return;
-                }
-                for (InlongStreamBriefInfo streamInfo : streamInfoList) {
-                    this.deletePulsarTopic(groupInfo, clusterInfo, streamInfo.getMqResource());
+                for (InlongStreamBriefInfo streamInfo : streamInfos) {
+                    this.deletePulsarTopic(groupInfo, pulsarCluster, streamInfo.getMqResource());
                 }
             } catch (Exception e) {
-                log.error("failed to delete pulsar resource for groupId={}, cluster={}", groupId, clusterName, e);
-                throw new WorkflowListenerException("failed to delete pulsar resource: " + e.getMessage());
+                String msg = "failed to delete pulsar resource for groupId=" + groupId;
+                log.error(msg + ", cluster=" + pulsarCluster, e);
+                throw new WorkflowListenerException(msg + ": " + e.getMessage());
             }
         }
-        log.info("success to delete pulsar resource for groupId={}", groupId);
+
+        log.info("success to delete pulsar resource for groupId={}, clusterTag={}", groupId, clusterTag);
     }
 
     @Override
@@ -156,22 +151,25 @@ public class PulsarResourceOperator implements QueueResourceOperator {
 
         String groupId = streamInfo.getInlongGroupId();
         String streamId = streamInfo.getInlongStreamId();
-        log.info("begin to create pulsar resource for groupId={}, streamId={}", groupId, streamId);
+        String clusterTag = groupInfo.getInlongClusterTag();
+        log.info("begin to create pulsar resource for groupId={}, streamId={}, clusterTag={}",
+                groupId, streamId, clusterTag);
 
-        List<PulsarClusterInfo> pulsarClusters =
-                clusterService.listByTagAndType(groupInfo.getInlongClusterTag(), ClusterType.PULSAR).stream()
-                        .map(clusterInfo -> (PulsarClusterInfo) clusterInfo)
-                        .collect(Collectors.toList());
-        for (PulsarClusterInfo pulsarCluster : pulsarClusters) {
+        // get pulsar cluster via the inlong cluster tag from the inlong group
+        List<ClusterInfo> clusterInfos = clusterService.listByTagAndType(clusterTag, ClusterType.PULSAR);
+        for (ClusterInfo clusterInfo : clusterInfos) {
+            PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfo;
             try {
                 // create pulsar topic and subscription
-                this.createTopic((InlongPulsarInfo) groupInfo, pulsarCluster, streamInfo.getMqResource());
-                this.createSubscription((InlongPulsarInfo) groupInfo, pulsarCluster,
-                        streamInfo.getMqResource(), streamId);
+                String topicName = streamInfo.getMqResource();
+                this.createTopic((InlongPulsarInfo) groupInfo, pulsarCluster, topicName);
+                this.createSubscription((InlongPulsarInfo) groupInfo, pulsarCluster, topicName, streamId);
+
+                log.info("success to create pulsar resource for groupId={}, streamId={}, topic={}, cluster={}",
+                        groupId, streamId, topicName, pulsarCluster);
             } catch (Exception e) {
-                String msg = String.format("failed to create pulsar topic for groupId=%s, streamId=%s, cluster=%s",
-                        groupId, streamId, pulsarCluster.getName());
-                log.error(msg, e);
+                String msg = "failed to create pulsar resource for groupId=" + groupId + ", streamId=" + streamId;
+                log.error(msg + ", cluster=" + pulsarCluster, e);
                 throw new WorkflowListenerException(msg + ": " + e.getMessage());
             }
         }
@@ -186,25 +184,24 @@ public class PulsarResourceOperator implements QueueResourceOperator {
 
         String groupId = streamInfo.getInlongGroupId();
         String streamId = streamInfo.getInlongStreamId();
-        log.info("begin to delete pulsar resource for groupId={} streamId={}", groupId, streamId);
-
-        List<PulsarClusterInfo> pulsarClusters =
-                clusterService.listByTagAndType(groupInfo.getInlongClusterTag(), ClusterType.PULSAR).stream()
-                        .map(clusterInfo -> (PulsarClusterInfo) clusterInfo)
-                        .collect(Collectors.toList());
-        for (PulsarClusterInfo clusterInfo : pulsarClusters) {
-            String clusterName = clusterInfo.getName();
+        String clusterTag = groupInfo.getInlongClusterTag();
+        log.info("begin to delete pulsar resource for groupId={}, streamId={}, clusterTag={}",
+                groupId, streamId, clusterTag);
+
+        List<ClusterInfo> clusterInfos = clusterService.listByTagAndType(clusterTag, ClusterType.PULSAR);
+        for (ClusterInfo clusterInfo : clusterInfos) {
+            PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfo;
             try {
-                this.deletePulsarTopic(groupInfo, clusterInfo, streamInfo.getMqResource());
-                log.info("success to delete pulsar topic for groupId={}, streamId={}, cluster={}",
-                        groupId, streamId, clusterName);
+                this.deletePulsarTopic(groupInfo, pulsarCluster, streamInfo.getMqResource());
+                log.info("success to delete pulsar topic for groupId={}, streamId={}, topic={}, cluster={}",
+                        groupId, streamId, streamInfo.getMqResource(), pulsarCluster);
             } catch (Exception e) {
-                String msg = String.format("failed to delete pulsar topic for groupId=%s, streamId=%s, cluster=%s",
-                        groupId, streamId, clusterName);
-                log.error(msg, e);
-                throw new WorkflowListenerException(msg);
+                String msg = "failed to delete pulsar topic for groupId=" + groupId + ", streamId=" + streamId;
+                log.error(msg + ", cluster=" + pulsarCluster, e);
+                throw new WorkflowListenerException(msg + ": " + e.getMessage());
             }
         }
+
         log.info("success to delete pulsar resource for groupId={}, streamId={}", groupId, streamId);
     }
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/es/ElasticsearchConfig.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/es/ElasticsearchConfig.java
index 6cccff706..c2df68f05 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/es/ElasticsearchConfig.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/es/ElasticsearchConfig.java
@@ -17,17 +17,14 @@
 
 package org.apache.inlong.manager.service.resource.sink.es;
 
-import java.util.ArrayList;
-import java.util.List;
-
 import lombok.Data;
-
 import org.apache.commons.lang3.StringUtils;
 import org.apache.http.HttpHost;
 import org.apache.http.auth.AuthScope;
 import org.apache.http.auth.UsernamePasswordCredentials;
 import org.apache.http.client.CredentialsProvider;
 import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.inlong.common.constant.ProtocolType;
 import org.elasticsearch.client.RestClient;
 import org.elasticsearch.client.RestClientBuilder;
 import org.elasticsearch.client.RestHighLevelClient;
@@ -36,6 +33,9 @@ import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Component;
 
+import java.util.ArrayList;
+import java.util.List;
+
 /**
  * Elasticsearch config information, including host, port, etc.
  */
@@ -43,6 +43,8 @@ import org.springframework.stereotype.Component;
 @Component
 public class ElasticsearchConfig {
 
+    private static final Logger logger = LoggerFactory.getLogger(ElasticsearchConfig.class);
+    private static RestHighLevelClient highLevelClient;
     @Value("${es.index.search.hostname}")
     private String host;
     @Value("${es.index.search.port}")
@@ -54,10 +56,6 @@ public class ElasticsearchConfig {
     @Value("${es.auth.password}")
     private String password;
 
-    private static final Logger logger = LoggerFactory.getLogger(ElasticsearchConfig.class);
-
-    private static RestHighLevelClient highLevelClient;
-
     /**
      * highLevelClient
      *
@@ -75,7 +73,7 @@ public class ElasticsearchConfig {
                     for (String host : hostArrays) {
                         if (StringUtils.isNotBlank(host)) {
                             host = host.trim();
-                            hosts.add(new HttpHost(host, port, "http"));
+                            hosts.add(new HttpHost(host, port, ProtocolType.HTTP));
                         }
                     }
                     RestClientBuilder clientBuilder = RestClient.builder(hosts.toArray(new HttpHost[0]));
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/mysql/MySQLJdbcUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/mysql/MySQLJdbcUtils.java
index 441779c5c..d463ca9ab 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/mysql/MySQLJdbcUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/mysql/MySQLJdbcUtils.java
@@ -38,10 +38,8 @@ import java.util.Objects;
 public class MySQLJdbcUtils {
 
     private static final String MYSQL_JDBC_PREFIX = "jdbc:mysql";
-
     private static final String MYSQL_DRIVER_CLASS = "com.mysql.cj.jdbc.Driver";
-
-    private static final Logger LOG = LoggerFactory.getLogger(MySQLJdbcUtils.class);
+    private static final Logger LOGGER = LoggerFactory.getLogger(MySQLJdbcUtils.class);
 
     /**
      * Get MySQL connection from the url and user.
@@ -52,46 +50,46 @@ public class MySQLJdbcUtils {
      * @return {@link Connection}
      * @throws Exception on get connection error
      */
-    public static Connection getConnection(final String url, final String user, final String password)
-            throws Exception {
+    public static Connection getConnection(String url, String user, String password) throws Exception {
         if (StringUtils.isBlank(url) || !url.startsWith(MYSQL_JDBC_PREFIX)) {
-            throw new Exception("MySQL server URL was invalid, it should start with jdbc:mysql");
+            throw new Exception("MySQL JDBC URL was invalid, it should start with jdbc:mysql");
         }
+
         Connection conn;
         try {
             Class.forName(MYSQL_DRIVER_CLASS);
             conn = DriverManager.getConnection(url, user, password);
         } catch (Exception e) {
-            final String errorMsg = "get MySQL connection error, please check MySQL JDBC url, username or password!";
-            LOG.error(errorMsg, e);
+            String errorMsg = "get MySQL connection error, please check MySQL JDBC url, username or password!";
+            LOGGER.error(errorMsg, e);
             throw new Exception(errorMsg + " other error msg: " + e.getMessage());
         }
         if (Objects.isNull(conn)) {
             throw new Exception("get MySQL connection failed, please contact administrator.");
         }
-        LOG.info("get MySQL connection success, url={}", url);
+        LOGGER.info("get MySQL connection success for url={}", url);
         return conn;
     }
 
     /**
      * Execute SQL command on MySQL.
      *
-     * @param conn JDBC Connection  {@link Connection}
-     * @param sql SQL string to be executed
+     * @param conn JDBC {@link Connection}
+     * @param sql SQL to be executed
      * @throws Exception on execute SQL error
      */
     public static void executeSql(final Connection conn, final String sql) throws Exception {
         try (Statement stmt = conn.createStatement()) {
             stmt.execute(sql);
-            LOG.info("execute sql [{}] success", sql);
+            LOGGER.info("execute sql [{}] success", sql);
         }
     }
 
     /**
      * Execute batch query SQL on MySQL.
      *
-     * @param conn JDBC Connection  {@link Connection}
-     * @param sqls SQL string to be executed
+     * @param conn JDBC {@link Connection}
+     * @param sqls SQL to be executed
      * @throws Exception on get execute SQL batch error
      */
     public static void executeSqlBatch(final Connection conn, final List<String> sqls) throws Exception {
@@ -101,7 +99,7 @@ public class MySQLJdbcUtils {
                 stmt.execute(entry);
             }
             conn.commit();
-            LOG.info("execute sql [{}] success", sqls);
+            LOGGER.info("execute sql [{}] success", sqls);
         } finally {
             conn.setAutoCommit(true);
         }
@@ -110,7 +108,7 @@ public class MySQLJdbcUtils {
     /**
      * Create MySQL database
      *
-     * @param conn JDBC Connection  {@link Connection}
+     * @param conn JDBC {@link Connection}
      * @param dbName database name
      * @throws Exception on create database error
      */
@@ -118,58 +116,58 @@ public class MySQLJdbcUtils {
         if (!checkDbExist(conn, dbName)) {
             final String createDbSql = MySQLSqlBuilder.buildCreateDbSql(dbName);
             executeSql(conn, createDbSql);
-            LOG.info("execute sql [{}] success", createDbSql);
+            LOGGER.info("execute sql [{}] success", createDbSql);
         } else {
-            LOG.info("The database [{}] are exists", dbName);
+            LOGGER.info("The database [{}] are exists", dbName);
         }
     }
 
     /**
      * Check database from the MySQL information_schema.
      *
-     * @param conn JDBC Connection  {@link Connection}
-     * @param dbName MySQL database name
+     * @param conn JDBC {@link Connection}
+     * @param dbName database name
      * @return true if table exist, otherwise false
      * @throws Exception on check database exist error
      */
     public static boolean checkDbExist(final Connection conn, final String dbName) throws Exception {
-        boolean result = false;
         final String checkDbSql = MySQLSqlBuilder.getCheckDatabase(dbName);
         try (Statement stmt = conn.createStatement();
                 ResultSet resultSet = stmt.executeQuery(checkDbSql)) {
             if (Objects.nonNull(resultSet)) {
                 if (resultSet.next()) {
+                    LOGGER.info("check db exist for db={}, result=true", dbName);
                     return true;
                 }
             }
         }
-        LOG.info("check db exist for db={}, result={}", dbName, result);
-        return result;
+        LOGGER.info("check db exist for db={}, result=false", dbName);
+        return false;
     }
 
     /**
      * Create MySQL table by MySQLTableInfo
      *
-     * @param conn JDBC Connection  {@link Connection}
-     * @param tableInfo MySQL table info  {@link MySQLTableInfo}
+     * @param conn JDBC {@link Connection}
+     * @param tableInfo table info  {@link MySQLTableInfo}
      * @throws Exception on create table error
      */
     public static void createTable(final Connection conn, final MySQLTableInfo tableInfo) throws Exception {
         if (checkTablesExist(conn, tableInfo.getDbName(), tableInfo.getTableName())) {
-            LOG.info("The table [{}] are exists", tableInfo.getTableName());
+            LOGGER.info("The table [{}] are exists", tableInfo.getTableName());
         } else {
             final String createTableSql = MySQLSqlBuilder.buildCreateTableSql(tableInfo);
             executeSql(conn, createTableSql);
-            LOG.info("execute sql [{}] success", createTableSql);
+            LOGGER.info("execute sql [{}] success", createTableSql);
         }
     }
 
     /**
      * Check tables from the MySQL information_schema.
      *
-     * @param conn JDBC Connection  {@link Connection}
-     * @param dbName MySQL database name
-     * @param tableName MySQL table name
+     * @param conn JDBC {@link Connection}
+     * @param dbName database name
+     * @param tableName table name
      * @return true if table exist, otherwise false
      * @throws Exception on check table exist error
      */
@@ -185,23 +183,22 @@ public class MySQLJdbcUtils {
                 }
             }
         }
-        LOG.info("check table exist for db={} table={}, result={}", dbName, tableName, result);
+        LOGGER.info("check table exist for db={} table={}, result={}", dbName, tableName, result);
         return result;
     }
 
     /**
-     * Check whether the column exists in the table.
+     * Check whether the column exists in the MySQL table.
      *
      * @param conn JDBC Connection  {@link Connection}
-     * @param dbName MySQL database name
-     * @param tableName MySQL table name
-     * @param column MySQL table column name
+     * @param dbName database name
+     * @param tableName table name
+     * @param column table column name
      * @return true if column exist in the table, otherwise false
      * @throws Exception on check column exist error
      */
     public static boolean checkColumnExist(final Connection conn, final String dbName, final String tableName,
-            final String column)
-            throws Exception {
+            final String column) throws Exception {
         boolean result = false;
         final String checkTableSql = MySQLSqlBuilder.getCheckColumn(dbName, tableName, column);
         try (Statement stmt = conn.createStatement();
@@ -212,16 +209,16 @@ public class MySQLJdbcUtils {
                 }
             }
         }
-        LOG.info("check column exist for db={} table={}, result={} column={}", dbName, tableName, result, column);
+        LOGGER.info("check column exist for db={} table={}, result={} column={}", dbName, tableName, result, column);
         return result;
     }
 
     /**
-     * Query all columns of the tableName.
+     * Query all MySQL table columns by the given tableName.
      *
-     * @param conn JDBC Connection  {@link Connection}
-     * @param dbName MySQL database name
-     * @param tableName MySQL table name
+     * @param conn JDBC {@link Connection}
+     * @param dbName database name
+     * @param tableName table name
      * @return {@link List}
      * @throws Exception on get columns error
      */
@@ -247,9 +244,9 @@ public class MySQLJdbcUtils {
      * Add columns for MySQL table.
      *
      * @param conn JDBC Connection  {@link Connection}
-     * @param dbName MySQL database name
-     * @param tableName MySQL table name
-     * @param columns MySQL columns to be added
+     * @param dbName database name
+     * @param tableName table name
+     * @param columns columns to be added
      * @throws Exception on add columns error
      */
     public static void addColumns(final Connection conn, final String dbName, final String tableName,
@@ -264,4 +261,5 @@ public class MySQLJdbcUtils {
         final List<String> addColumnSql = MySQLSqlBuilder.buildAddColumnsSql(dbName, tableName, columnInfos);
         executeSqlBatch(conn, addColumnSql);
     }
+
 }
\ No newline at end of file
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/mysql/MySQLResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/mysql/MySQLResourceOperator.java
index d60232ac0..6c859cf2a 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/mysql/MySQLResourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/mysql/MySQLResourceOperator.java
@@ -19,15 +19,15 @@ package org.apache.inlong.manager.service.resource.sink.mysql;
 
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.inlong.manager.common.consts.InlongConstants;
-import org.apache.inlong.manager.common.enums.SinkStatus;
 import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.enums.SinkStatus;
 import org.apache.inlong.manager.common.exceptions.WorkflowException;
+import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
+import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
 import org.apache.inlong.manager.pojo.sink.SinkInfo;
 import org.apache.inlong.manager.pojo.sink.mysql.MySQLColumnInfo;
 import org.apache.inlong.manager.pojo.sink.mysql.MySQLSinkDTO;
 import org.apache.inlong.manager.pojo.sink.mysql.MySQLTableInfo;
-import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
-import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
 import org.apache.inlong.manager.service.resource.sink.SinkResourceOperator;
 import org.apache.inlong.manager.service.sink.StreamSinkService;
 import org.slf4j.Logger;
@@ -90,19 +90,19 @@ public class MySQLResourceOperator implements SinkResourceOperator {
             columnList.add(columnInfo);
         }
 
-        final MySQLSinkDTO mySQLSink = MySQLSinkDTO.getFromJson(sinkInfo.getExtParams());
-        final MySQLTableInfo tableInfo = MySQLSinkDTO.getTableInfo(mySQLSink, columnList);
-
-        try (Connection conn = MySQLJdbcUtils.getConnection(mySQLSink.getJdbcUrl(), mySQLSink.getUsername(),
-                mySQLSink.getPassword())) {
+        MySQLSinkDTO sinkDTO = MySQLSinkDTO.getFromJson(sinkInfo.getExtParams());
+        MySQLTableInfo tableInfo = MySQLSinkDTO.getTableInfo(sinkDTO, columnList);
+        try (Connection conn = MySQLJdbcUtils.getConnection(sinkDTO.getJdbcUrl(), sinkDTO.getUsername(),
+                sinkDTO.getPassword())) {
             // 1. create database if not exists
             MySQLJdbcUtils.createDb(conn, tableInfo.getDbName());
             // 2. table not exists, create it
             MySQLJdbcUtils.createTable(conn, tableInfo);
             // 3. table exists, add columns - skip the exists columns
             MySQLJdbcUtils.addColumns(conn, tableInfo.getDbName(), tableInfo.getTableName(), columnList);
+
             // 4. update the sink status to success
-            final String info = "success to create MySQL resource";
+            String info = "success to create MySQL resource";
             sinkService.updateStatus(sinkInfo.getId(), SinkStatus.CONFIG_SUCCESSFUL.getCode(), info);
             LOG.info(info + " for sinkInfo={}", sinkInfo);
         } catch (Throwable e) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
index 2449b835e..08ec23226 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
@@ -78,16 +78,16 @@ public class DefaultSortConfigOperator implements SortConfigOperator {
     public void buildConfig(InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfos, boolean isStream)
             throws Exception {
         if (isStream) {
-            LOGGER.warn("stream workflow no need to build sort config for disable zk");
+            LOGGER.warn("no need to build sort config for stream process when disable zk");
             return;
         }
         if (groupInfo == null || CollectionUtils.isEmpty(streamInfos)) {
-            LOGGER.warn("group info is null or stream infos is empty, no need to build sort config for disable zk");
+            LOGGER.warn("no need to build sort config as the group is null or streams is empty when disable zk");
             return;
         }
 
-        GroupInfo configInfo = this.getGroupInfo(groupInfo, streamInfos);
-        String dataflow = OBJECT_MAPPER.writeValueAsString(configInfo);
+        GroupInfo sortConfigInfo = this.getGroupInfo(groupInfo, streamInfos);
+        String dataflow = OBJECT_MAPPER.writeValueAsString(sortConfigInfo);
         this.addToGroupExt(groupInfo, dataflow);
 
         if (LOGGER.isDebugEnabled()) {
@@ -127,9 +127,8 @@ public class DefaultSortConfigOperator implements SortConfigOperator {
             if (InlongConstants.STANDARD_MODE.equals(groupInfo.getLightweight())) {
                 if (CollectionUtils.isNotEmpty(transformResponses)) {
                     relations = NodeRelationUtils.createNodeRelations(inlongStream);
-
-                    // in standard mode, replace upstream source node and transform input fields node to mq node
-                    // mq node name, which is inlong stream id
+                    // in standard mode, replace upstream source node and transform input fields node
+                    // to MQ node (which is inlong stream id)
                     String mqNodeName = sources.get(0).getSourceName();
                     Set<String> nodeNameSet = getInputNodeNames(sources, transformResponses);
                     adjustTransformField(transformResponses, nodeNameSet, mqNodeName);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
index 6247822b3..a3e68b8b5 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
@@ -236,7 +236,7 @@ public abstract class AbstractSourceOperator implements StreamSourceOperator {
         }
     }
 
-    private void updateFieldOpt(StreamSourceEntity entity, List<StreamField> fieldInfos) {
+    protected void updateFieldOpt(StreamSourceEntity entity, List<StreamField> fieldInfos) {
         Integer sourceId = entity.getId();
         if (CollectionUtils.isEmpty(fieldInfos)) {
             return;
@@ -270,7 +270,7 @@ public abstract class AbstractSourceOperator implements StreamSourceOperator {
         streamFieldMapper.insertAll(list);
     }
 
-    private void saveFieldOpt(StreamSourceEntity entity, List<StreamField> fieldInfos) {
+    protected void saveFieldOpt(StreamSourceEntity entity, List<StreamField> fieldInfos) {
         LOGGER.info("begin to save source fields={}", fieldInfos);
         if (CollectionUtils.isEmpty(fieldInfos)) {
             return;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/SourceSnapshotOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/SourceSnapshotOperator.java
index ddcdc0dde..e94cf2276 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/SourceSnapshotOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/SourceSnapshotOperator.java
@@ -36,7 +36,6 @@ import org.springframework.stereotype.Service;
 
 import javax.annotation.PostConstruct;
 import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -44,6 +43,11 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.inlong.manager.common.consts.InlongConstants.ALIVE_TIME_MS;
+import static org.apache.inlong.manager.common.consts.InlongConstants.CORE_POOL_SIZE;
+import static org.apache.inlong.manager.common.consts.InlongConstants.MAX_POOL_SIZE;
+import static org.apache.inlong.manager.common.consts.InlongConstants.QUEUE_SIZE;
+
 /**
  * Operate the source snapshot
  */
@@ -51,14 +55,15 @@ import java.util.concurrent.TimeUnit;
 public class SourceSnapshotOperator implements AutoCloseable {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(SourceSnapshotOperator.class);
-    private final ExecutorService executorService = new ThreadPoolExecutor(
-            1,
-            1,
-            10L,
-            TimeUnit.SECONDS,
-            new ArrayBlockingQueue<>(100),
+    private static final ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor(
+            CORE_POOL_SIZE,
+            MAX_POOL_SIZE,
+            ALIVE_TIME_MS,
+            TimeUnit.MILLISECONDS,
+            new LinkedBlockingQueue<>(QUEUE_SIZE),
             new ThreadFactoryBuilder().setNameFormat("stream-source-snapshot-%s").build(),
             new CallerRunsPolicy());
+
     @Autowired
     private StreamSourceEntityMapper sourceMapper;
 
@@ -101,7 +106,7 @@ public class SourceSnapshotOperator implements AutoCloseable {
             snapshotQueue = new LinkedBlockingQueue<>(queueSize);
         }
         SaveSnapshotTaskRunnable taskRunnable = new SaveSnapshotTaskRunnable();
-        this.executorService.execute(taskRunnable);
+        EXECUTOR_SERVICE.execute(taskRunnable);
         LOGGER.info("source snapshot operate thread started successfully");
     }
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
index 12ebe6200..c4af55daa 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
@@ -142,6 +142,7 @@ public class PulsarSourceOperator extends AbstractSourceOperator {
                         && StringUtils.isNotEmpty(sourceInfo.getSerializationType())) {
                     pulsarSource.setSerializationType(sourceInfo.getSerializationType());
                 }
+                // currently, only reuse the primary key from Kafka source
                 if (SourceType.KAFKA.equals(sourceInfo.getSourceType())) {
                     pulsarSource.setPrimaryKey(((KafkaSource) sourceInfo).getPrimaryKey());
                 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamProcessService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamProcessService.java
index eaa05ba41..43fff8cdd 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamProcessService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamProcessService.java
@@ -42,6 +42,11 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.inlong.manager.common.consts.InlongConstants.ALIVE_TIME_MS;
+import static org.apache.inlong.manager.common.consts.InlongConstants.CORE_POOL_SIZE;
+import static org.apache.inlong.manager.common.consts.InlongConstants.MAX_POOL_SIZE;
+import static org.apache.inlong.manager.common.consts.InlongConstants.QUEUE_SIZE;
+
 /**
  * Operation related to inlong stream process
  */
@@ -49,12 +54,12 @@ import java.util.concurrent.TimeUnit;
 @Service
 public class InlongStreamProcessService {
 
-    private final ExecutorService executorService = new ThreadPoolExecutor(
-            20,
-            40,
-            0L,
+    private static final ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor(
+            CORE_POOL_SIZE,
+            MAX_POOL_SIZE,
+            ALIVE_TIME_MS,
             TimeUnit.MILLISECONDS,
-            new LinkedBlockingQueue<>(),
+            new LinkedBlockingQueue<>(QUEUE_SIZE),
             new ThreadFactoryBuilder().setNameFormat("inlong-stream-process-%s").build(),
             new CallerRunsPolicy());
 
@@ -100,7 +105,7 @@ public class InlongStreamProcessService {
             ProcessStatus processStatus = workflowResult.getProcessInfo().getStatus();
             return processStatus == ProcessStatus.COMPLETED;
         } else {
-            executorService.execute(() -> workflowService.start(processName, operator, processForm));
+            EXECUTOR_SERVICE.execute(() -> workflowService.start(processName, operator, processForm));
             return true;
         }
     }
@@ -143,7 +148,7 @@ public class InlongStreamProcessService {
             ProcessStatus processStatus = workflowResult.getProcessInfo().getStatus();
             return processStatus == ProcessStatus.COMPLETED;
         } else {
-            executorService.execute(() -> workflowService.start(processName, operator, processForm));
+            EXECUTOR_SERVICE.execute(() -> workflowService.start(processName, operator, processForm));
             return true;
         }
     }
@@ -158,8 +163,7 @@ public class InlongStreamProcessService {
             throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
         }
         GroupStatus groupStatus = GroupStatus.forCode(groupInfo.getStatus());
-        if (groupStatus != GroupStatus.CONFIG_SUCCESSFUL
-                && groupStatus != GroupStatus.RESTARTED) {
+        if (groupStatus != GroupStatus.CONFIG_SUCCESSFUL && groupStatus != GroupStatus.RESTARTED) {
             throw new BusinessException(
                     String.format("group status=%s not support restart stream for groupId=%s", groupStatus, groupId));
         }
@@ -185,7 +189,7 @@ public class InlongStreamProcessService {
             ProcessStatus processStatus = workflowResult.getProcessInfo().getStatus();
             return processStatus == ProcessStatus.COMPLETED;
         } else {
-            executorService.execute(() -> workflowService.start(processName, operator, processForm));
+            EXECUTOR_SERVICE.execute(() -> workflowService.start(processName, operator, processForm));
             return true;
         }
     }
@@ -235,7 +239,7 @@ public class InlongStreamProcessService {
                 return false;
             }
         } else {
-            executorService.execute(() -> {
+            EXECUTOR_SERVICE.execute(() -> {
                 WorkflowResult workflowResult = workflowService.start(processName, operator, processForm);
                 ProcessStatus processStatus = workflowResult.getProcessInfo().getStatus();
                 if (processStatus == ProcessStatus.COMPLETED) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
index 2afa702ca..633d46fca 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
@@ -504,7 +504,8 @@ public class InlongStreamServiceImpl implements InlongStreamService {
 
     /**
      * Update field information
-     * <p/>First physically delete the existing field information, and then add the field information of this batch
+     * <p/>
+     * First physically delete the existing field information, and then add the field information of this batch
      */
     @Transactional(rollbackFor = Throwable.class)
     void updateField(String groupId, String streamId, List<StreamField> fieldList) {
@@ -525,7 +526,7 @@ public class InlongStreamServiceImpl implements InlongStreamService {
         if (CollectionUtils.isEmpty(infoList)) {
             return;
         }
-        infoList.stream().forEach(streamField -> streamField.setId(null));
+        infoList.forEach(streamField -> streamField.setId(null));
         List<InlongStreamFieldEntity> list = CommonBeanUtils.copyListProperties(infoList,
                 InlongStreamFieldEntity::new);
         for (InlongStreamFieldEntity entity : list) {
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/RestTemplateConfig.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/RestTemplateConfig.java
index 9306364bc..d07626cae 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/RestTemplateConfig.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/RestTemplateConfig.java
@@ -29,6 +29,7 @@ import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
 import org.apache.http.impl.client.HttpClientBuilder;
 import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
 import org.apache.http.protocol.HttpContext;
+import org.apache.inlong.common.constant.ProtocolType;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
@@ -79,8 +80,8 @@ public class RestTemplateConfig {
     public PoolingHttpClientConnectionManager httpClientConnectionManager() {
         // Support HTTP, HTTPS
         Registry<ConnectionSocketFactory> registry = RegistryBuilder.<ConnectionSocketFactory>create()
-                .register("http", PlainConnectionSocketFactory.getSocketFactory())
-                .register("https", SSLConnectionSocketFactory.getSocketFactory())
+                .register(ProtocolType.HTTP, PlainConnectionSocketFactory.getSocketFactory())
+                .register(ProtocolType.HTTPS, SSLConnectionSocketFactory.getSocketFactory())
                 .build();
         PoolingHttpClientConnectionManager httpClientConnectionManager = new PoolingHttpClientConnectionManager(
                 registry);
diff --git a/inlong-manager/manager-test/src/main/resources/application-unit-test.properties b/inlong-manager/manager-test/src/main/resources/application-unit-test.properties
index b02b1d211..862eba470 100644
--- a/inlong-manager/manager-test/src/main/resources/application-unit-test.properties
+++ b/inlong-manager/manager-test/src/main/resources/application-unit-test.properties
@@ -18,7 +18,7 @@
 #
 
 # Log level
-logging.level.root=INFO
+logging.level.root=info
 logging.level.org.apache.inlong.manager=debug
 
 spring.sql.init.platform=h2
@@ -31,7 +31,7 @@ spring.datasource.username=root
 spring.datasource.password=""
 
 # Audit configuration
-# Audit query source that decide what data source to query, currently only supports [MYSQL|ELASTICSEARCH]
+# Audit query source that decide what data source to query, currently only supports [MYSQL|ELASTICSEARCH|CLICKHOUSE]
 audit.query.source=MYSQL
 
 # Elasticsearch config
@@ -47,7 +47,7 @@ es.auth.user=admin
 es.auth.password=inlong
 
 # ClickHouse config
-# ClickHouse url
+# ClickHouse jdbcUrl
 audit.ck.jdbcUrl=jdbc:clickhouse://127.0.0.1:8123/apache_inlong_audit
 # ClickHouse username
 audit.ck.username=default
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/auth/openapi/OpenAPIFilter.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/auth/openapi/OpenAPIFilter.java
index 9eae25d0e..3b0f6d107 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/auth/openapi/OpenAPIFilter.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/auth/openapi/OpenAPIFilter.java
@@ -62,7 +62,7 @@ public class OpenAPIFilter implements Filter {
             SecretToken token = parseBasicAuth(httpServletRequest);
             subject.login(token);
         } catch (Exception ex) {
-            LOGGER.error("login error, msg: {}", ex.getMessage());
+            LOGGER.error("login error: {}", ex.getMessage());
             ((HttpServletResponse) servletResponse).sendError(HttpServletResponse.SC_FORBIDDEN, ex.getMessage());
             return;
         }
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/auth/web/AuthenticationFilter.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/auth/web/AuthenticationFilter.java
index 1e0b8c976..5408daefc 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/auth/web/AuthenticationFilter.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/auth/web/AuthenticationFilter.java
@@ -70,7 +70,7 @@ public class AuthenticationFilter implements Filter {
             UsernamePasswordToken token = getPasswordToken(servletRequest);
             subject.login(token);
         } catch (Exception ex) {
-            LOGGER.error("login error, msg: {}", ex.getMessage());
+            LOGGER.error("login error: {}", ex.getMessage());
             ((HttpServletResponse) servletResponse).sendError(HttpServletResponse.SC_FORBIDDEN, ex.getMessage());
             return;
         }
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/config/RestTemplateConfig.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/config/RestTemplateConfig.java
index 68022eef1..79380d0d8 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/config/RestTemplateConfig.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/config/RestTemplateConfig.java
@@ -17,8 +17,6 @@
 
 package org.apache.inlong.manager.web.config;
 
-import java.nio.charset.StandardCharsets;
-import java.util.List;
 import lombok.Data;
 import org.apache.http.HttpResponse;
 import org.apache.http.client.HttpClient;
@@ -31,6 +29,7 @@ import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
 import org.apache.http.impl.client.HttpClientBuilder;
 import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
 import org.apache.http.protocol.HttpContext;
+import org.apache.inlong.common.constant.ProtocolType;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
 import org.springframework.boot.context.properties.ConfigurationProperties;
@@ -43,6 +42,9 @@ import org.springframework.util.ObjectUtils;
 import org.springframework.web.client.DefaultResponseErrorHandler;
 import org.springframework.web.client.RestTemplate;
 
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
 @Data
 @Configuration
 @ConditionalOnMissingBean(RestTemplate.class)
@@ -84,8 +86,8 @@ public class RestTemplateConfig {
     public PoolingHttpClientConnectionManager httpClientConnectionManager() {
         // Support HTTP, HTTPS
         Registry<ConnectionSocketFactory> registry = RegistryBuilder.<ConnectionSocketFactory>create()
-                .register("http", PlainConnectionSocketFactory.getSocketFactory())
-                .register("https", SSLConnectionSocketFactory.getSocketFactory())
+                .register(ProtocolType.HTTP, PlainConnectionSocketFactory.getSocketFactory())
+                .register(ProtocolType.HTTPS, SSLConnectionSocketFactory.getSocketFactory())
                 .build();
         PoolingHttpClientConnectionManager httpClientConnectionManager = new PoolingHttpClientConnectionManager(
                 registry);
diff --git a/inlong-manager/manager-web/src/main/resources/application-prod.properties b/inlong-manager/manager-web/src/main/resources/application-prod.properties
index b11bfdf1f..dfac2d818 100644
--- a/inlong-manager/manager-web/src/main/resources/application-prod.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-prod.properties
@@ -44,6 +44,7 @@ spring.datasource.druid.testOnReturn=false
 spring.datasource.druid.filters=stat,wall
 # Open the mergeSql function through the connectProperties property, Slow SQL records
 spring.datasource.druid.connectionProperties=druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000
+
 # Exclude ElasticsearchRestClientAutoConfiguration
 spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.elasticsearch.ElasticsearchRestClientAutoConfiguration
 
@@ -64,7 +65,7 @@ es.auth.user=admin
 es.auth.password=inlong
 
 # ClickHouse config
-# ClickHouse url
+# ClickHouse jdbcUrl
 audit.ck.jdbcUrl=jdbc:clickhouse://127.0.0.1:8123/apache_inlong_audit
 # ClickHouse username
 audit.ck.username=default
diff --git a/inlong-manager/manager-web/src/main/resources/application-test.properties b/inlong-manager/manager-web/src/main/resources/application-test.properties
index 203124fc2..53dd907c1 100644
--- a/inlong-manager/manager-web/src/main/resources/application-test.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-test.properties
@@ -32,7 +32,7 @@ spring.datasource.druid.initialSize=20
 spring.datasource.druid.minIdle=20
 spring.datasource.druid.maxActive=300
 # Configure the timeout period to wait for the connection to be acquired
-spring.datasource.druid.maxWait=6000
+spring.datasource.druid.maxWait=600000
 # Configure the minimum survival time of a connection in the pool, in milliseconds
 spring.datasource.druid.minEvictableIdleTimeMillis=3600000
 # Detect when applying for connection. It is recommended to configure it to true, which does not affect performance and ensures safety
@@ -45,6 +45,7 @@ spring.datasource.druid.testOnReturn=false
 spring.datasource.druid.filters=stat,wall
 # Open the mergeSql function through the connectProperties property, Slow SQL records
 spring.datasource.druid.connectionProperties=druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000
+
 # Exclude ElasticsearchRestClientAutoConfiguration
 spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.elasticsearch.ElasticsearchRestClientAutoConfiguration
 
@@ -65,7 +66,7 @@ es.auth.user=admin
 es.auth.password=inlong
 
 # ClickHouse config
-# ClickHouse url
+# ClickHouse jdbcUrl
 audit.ck.jdbcUrl=jdbc:clickhouse://127.0.0.1:8123/apache_inlong_audit
 # ClickHouse username
 audit.ck.username=default
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/process/ProcessEventListener.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/process/ProcessEventListener.java
index 581024506..2bb692925 100644
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/process/ProcessEventListener.java
+++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/process/ProcessEventListener.java
@@ -29,6 +29,11 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.inlong.manager.common.consts.InlongConstants.ALIVE_TIME_MS;
+import static org.apache.inlong.manager.common.consts.InlongConstants.CORE_POOL_SIZE;
+import static org.apache.inlong.manager.common.consts.InlongConstants.MAX_POOL_SIZE;
+import static org.apache.inlong.manager.common.consts.InlongConstants.QUEUE_SIZE;
+
 /**
  * Process event listener
  */
@@ -43,11 +48,11 @@ public interface ProcessEventListener extends EventListener<ProcessEvent> {
      * Async process common thread pool
      */
     ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor(
-            20,
-            40,
-            0L,
+            CORE_POOL_SIZE,
+            MAX_POOL_SIZE,
+            ALIVE_TIME_MS,
             TimeUnit.MILLISECONDS,
-            new LinkedBlockingQueue<>(),
+            new LinkedBlockingQueue<>(QUEUE_SIZE),
             new ThreadFactoryBuilder().setNameFormat("inlong-workflow-%s").build(),
             new CallerRunsPolicy());
 
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/ServiceTaskProcessor.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/ServiceTaskProcessor.java
index ac61cfb5b..d157e9ecb 100644
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/ServiceTaskProcessor.java
+++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/ServiceTaskProcessor.java
@@ -105,21 +105,23 @@ public class ServiceTaskProcessor extends AbstractTaskProcessor<ServiceTask> {
         WorkflowContext.ActionContext actionContext = context.getActionContext();
         if (actionContext == null) {
             resetActionContext(context);
+            actionContext = context.getActionContext();
         }
-        WorkflowTaskEntity workflowTaskEntity = actionContext.getTaskEntity();
-        Preconditions.checkTrue(ALLOW_COMPLETE_STATE.contains(TaskStatus.valueOf(workflowTaskEntity.getStatus())),
-                "task status should allow complete");
+        WorkflowTaskEntity taskEntity = actionContext.getTaskEntity();
+        Preconditions.checkTrue(ALLOW_COMPLETE_STATE.contains(TaskStatus.valueOf(taskEntity.getStatus())),
+                String.format("task status %s not allowed to complete", taskEntity.getStatus()));
+
         try {
             ListenerResult listenerResult = this.taskEventNotifier.notify(TaskEvent.COMPLETE, context);
             if (!listenerResult.isSuccess()) {
-                failedTask(context, workflowTaskEntity);
+                failedTask(context, taskEntity);
             } else {
-                completeTaskEntity(context, workflowTaskEntity, TaskStatus.COMPLETED);
+                completeTaskEntity(context, taskEntity, TaskStatus.COMPLETED);
             }
             return listenerResult.isSuccess();
         } catch (Exception e) {
-            log.error("Complete service task failed", e);
-            failedTask(context, workflowTaskEntity);
+            log.error("failed to complete service task: " + taskEntity, e);
+            failedTask(context, taskEntity);
             return false;
         }
     }
@@ -139,12 +141,14 @@ public class ServiceTaskProcessor extends AbstractTaskProcessor<ServiceTask> {
         taskQuery.setProcessId(processId);
         taskQuery.setName(serviceName);
         List<WorkflowTaskEntity> taskEntities = taskEntityMapper.selectByQuery(taskQuery);
+
         WorkflowTaskEntity taskEntity;
         if (CollectionUtils.isEmpty(taskEntities)) {
             taskEntity = saveTaskEntity(serviceTask, context);
         } else {
             taskEntity = taskEntities.get(0);
         }
+
         ActionContext actionContext = new WorkflowContext.ActionContext()
                 .setTask((WorkflowTask) context.getCurrentElement())
                 .setAction(WorkflowAction.COMPLETE)
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/StartEventProcessor.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/StartEventProcessor.java
index d3b26786e..2b98103a8 100644
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/StartEventProcessor.java
+++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/StartEventProcessor.java
@@ -17,10 +17,10 @@
 
 package org.apache.inlong.manager.workflow.processor;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.common.enums.ProcessEvent;
 import org.apache.inlong.manager.common.enums.ProcessStatus;
-import org.apache.inlong.manager.common.exceptions.JsonException;
+import org.apache.inlong.manager.common.util.JsonUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.WorkflowProcessEntity;
 import org.apache.inlong.manager.dao.mapper.WorkflowProcessEntityMapper;
@@ -40,11 +40,10 @@ import java.util.Date;
 /**
  * Start event handler
  */
+@Slf4j
 @Service
 public class StartEventProcessor extends AbstractNextableElementProcessor<StartEvent> {
 
-    @Autowired
-    private ObjectMapper objectMapper;
     @Autowired
     private ProcessEventNotifier processEventNotifier;
     @Autowired
@@ -61,13 +60,16 @@ public class StartEventProcessor extends AbstractNextableElementProcessor<StartE
         WorkflowProcess process = context.getProcess();
         ProcessForm form = context.getProcessForm();
         if (process.getFormClass() != null) {
-            Preconditions.checkNotNull(form, "form cannot be null");
+            Preconditions.checkNotNull(form, "process form cannot be null");
             Preconditions.checkTrue(form.getClass().isAssignableFrom(process.getFormClass()),
-                    "form type not match, should be class " + process.getFormClass());
+                    String.format("form type %s should match the process form type %s",
+                            form.getClass(), process.getFormClass()));
             form.validate();
         } else {
-            Preconditions.checkNull(form, "no form required");
+            log.warn("not need to provide the form info");
+            context.setProcessForm(null);
         }
+
         WorkflowProcessEntity processEntity = saveProcessEntity(applicant, process, form);
         context.setProcessEntity(processEntity);
         context.setActionContext(new WorkflowContext.ActionContext().setAction(WorkflowAction.START));
@@ -91,18 +93,16 @@ public class StartEventProcessor extends AbstractNextableElementProcessor<StartE
         processEntity.setDisplayName(process.getDisplayName());
         processEntity.setType(process.getType());
         processEntity.setTitle(form.getTitle());
+
         processEntity.setInlongGroupId(form.getInlongGroupId());
         if (form instanceof StreamResourceProcessForm) {
             StreamResourceProcessForm streamForm = (StreamResourceProcessForm) form;
             processEntity.setInlongStreamId(streamForm.getStreamInfo().getInlongStreamId());
         }
+
         processEntity.setApplicant(applicant);
         processEntity.setStatus(ProcessStatus.PROCESSING.name());
-        try {
-            processEntity.setFormData(objectMapper.writeValueAsString(form));
-        } catch (Exception e) {
-            throw new JsonException("write form to json error: ", e);
-        }
+        processEntity.setFormData(JsonUtils.toJsonString(form));
         processEntity.setStartTime(new Date());
         processEntity.setHidden(process.getHidden());
 


[inlong] 04/04: [INLONG-6487][Manager] Add API to force delete the stream source (#6489)

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.4
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit d061d9a4e3d03790b6f3349f1dcd0f9dac71d303
Author: haifxu <xh...@gmail.com>
AuthorDate: Thu Nov 10 21:29:25 2022 +0800

    [INLONG-6487][Manager] Add API to force delete the stream source (#6489)
---
 .../inlong/manager/client/api/StreamSource.java    | 30 +++++++++++++++
 .../manager/client/api/impl/StreamSourceImpl.java  | 43 ++++++++++++++++++++++
 .../api/inner/client/StreamSourceClient.java       | 17 +++++++++
 .../client/api/service/StreamSourceApi.java        |  4 ++
 .../dao/mapper/StreamSourceEntityMapper.java       |  3 ++
 .../dao/mapper/StreamSourceFieldEntityMapper.java  | 11 ++++--
 .../resources/mappers/StreamSourceEntityMapper.xml | 17 ++++++++-
 .../mappers/StreamSourceFieldEntityMapper.xml      | 15 +++++---
 .../service/source/StreamSourceService.java        | 10 +++++
 .../service/source/StreamSourceServiceImpl.java    | 14 +++++++
 .../web/controller/StreamSourceController.java     | 15 +++++++-
 11 files changed, 168 insertions(+), 11 deletions(-)

diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSource.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSource.java
new file mode 100644
index 000000000..07927efd5
--- /dev/null
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSource.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.client.api;
+
+public interface StreamSource {
+
+    /**
+     * Force deletes the stream source by groupId and streamId
+     *
+     * @param groupId The belongs group id.
+     * @param streamId The belongs stream id.
+     * @return Whether succeed
+     */
+    Boolean forceDelete(String groupId, String streamId);
+}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/StreamSourceImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/StreamSourceImpl.java
new file mode 100644
index 000000000..3ed1a0437
--- /dev/null
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/StreamSourceImpl.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.client.api.impl;
+
+import org.apache.inlong.manager.client.api.ClientConfiguration;
+import org.apache.inlong.manager.client.api.StreamSource;
+import org.apache.inlong.manager.client.api.inner.client.ClientFactory;
+import org.apache.inlong.manager.client.api.inner.client.StreamSourceClient;
+import org.apache.inlong.manager.client.api.util.ClientUtils;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.util.Preconditions;
+
+public class StreamSourceImpl implements StreamSource {
+
+    private final StreamSourceClient sourceClient;
+
+    public StreamSourceImpl(ClientConfiguration configuration) {
+        ClientFactory clientFactory = ClientUtils.getClientFactory(configuration);
+        this.sourceClient = clientFactory.getSourceClient();
+    }
+
+    @Override
+    public Boolean forceDelete(String groupId, String streamId) {
+        Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
+        Preconditions.checkNotNull(streamId, ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage());
+        return sourceClient.forceDelete(groupId, streamId);
+    }
+}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSourceClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSourceClient.java
index f1eb4a87d..f98635066 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSourceClient.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSourceClient.java
@@ -21,6 +21,7 @@ import org.apache.commons.lang3.tuple.Pair;
 import org.apache.inlong.manager.client.api.ClientConfiguration;
 import org.apache.inlong.manager.client.api.service.StreamSourceApi;
 import org.apache.inlong.manager.client.api.util.ClientUtils;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.pojo.common.PageResult;
 import org.apache.inlong.manager.pojo.common.Response;
@@ -88,6 +89,22 @@ public class StreamSourceClient {
         return response.getData();
     }
 
+    /**
+     * Force deletes the stream source by groupId and streamId
+     *
+     * @param groupId The belongs group id.
+     * @param streamId The belongs stream id.
+     * @return Whether succeed
+     */
+    public boolean forceDelete(String groupId, String streamId) {
+        Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
+        Preconditions.checkNotNull(streamId, ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage());
+
+        Response<Boolean> response = ClientUtils.executeHttpCall(streamSourceApi.forceDelete(groupId, streamId));
+        ClientUtils.assertRespSuccess(response);
+        return response.getData();
+    }
+
     public StreamSource get(int id) {
         Preconditions.checkTrue(id > 0, "sourceId is illegal");
         Response<StreamSource> response = ClientUtils.executeHttpCall(streamSourceApi.get(id));
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSourceApi.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSourceApi.java
index 5c0fe8c15..33c933486 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSourceApi.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSourceApi.java
@@ -44,6 +44,10 @@ public interface StreamSourceApi {
     @DELETE("source/delete/{id}")
     Call<Response<Boolean>> deleteSource(@Path("id") Integer sourceId);
 
+    @DELETE("source/forceDelete")
+    Call<Response<Boolean>> forceDelete(@Query("inlongGroupId") String groupId,
+            @Query("inlongStreamId") String streamId);
+
     @GET("source/get/{id}")
     Call<Response<StreamSource>> get(@Path("id") Integer id);
 }
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
index 6e851e11c..aee466d88 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
@@ -105,6 +105,9 @@ public interface StreamSourceEntityMapper {
 
     int updateByPrimaryKeySelective(StreamSourceEntity record);
 
+    int updateByRelatedId(@Param("groupId") String groupId, @Param("streamId") String streamId,
+            @Param("status") Integer status);
+
     int updateByPrimaryKey(StreamSourceEntity record);
 
     /**
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceFieldEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceFieldEntityMapper.java
index 0059e8362..0e67a9f77 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceFieldEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceFieldEntityMapper.java
@@ -26,14 +26,12 @@ import java.util.List;
 @Repository
 public interface StreamSourceFieldEntityMapper {
 
-    int deleteByPrimaryKey(Integer id);
-
     int insert(StreamSourceFieldEntity record);
 
     int insertSelective(StreamSourceFieldEntity record);
 
     /**
-     * Selete undeleted source field by source id.
+     * Select undeleted source field by source id.
      *
      * @param sourceId source id
      * @return stream source field list
@@ -44,6 +42,13 @@ public interface StreamSourceFieldEntityMapper {
 
     int updateByPrimaryKey(StreamSourceFieldEntity record);
 
+    /**
+     * Logically delete all stream source fields based on inlong group id and inlong stream id
+     *
+     * @return rows deleted
+     */
+    int updateByRelatedId(@Param("groupId") String groupId, @Param("streamId") String streamId);
+
     /**
      * Insert all field list
      *
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
index 7b1d21071..e1e7f56a8 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
@@ -60,7 +60,7 @@
                                    previous_status, creator, modifier)
         values (#{inlongGroupId,jdbcType=VARCHAR}, #{inlongStreamId,jdbcType=VARCHAR},
                 #{sourceType,jdbcType=VARCHAR}, #{sourceName,jdbcType=VARCHAR}, #{templateId,jdbcType=INTEGER},
-                #{agentIp,jdbcType=VARCHAR},#{uuid,jdbcType=VARCHAR},
+                #{agentIp,jdbcType=VARCHAR}, #{uuid,jdbcType=VARCHAR},
                 #{dataNodeName,jdbcType=VARCHAR}, #{inlongClusterName,jdbcType=VARCHAR},
                 #{serializationType,jdbcType=VARCHAR}, #{snapshot,jdbcType=LONGVARCHAR},
                 #{modifyTime,jdbcType=TIMESTAMP}, #{extParams,jdbcType=LONGVARCHAR}, #{status,jdbcType=INTEGER},
@@ -271,6 +271,19 @@
         </where>
     </select>
 
+    <update id="updateByRelatedId">
+        update stream_source
+        <set>
+            is_deleted = id,
+            previous_status = status,
+            status = #{status, jdbcType=INTEGER},
+            version = version + 1
+        </set>
+        where is_deleted = 0
+        and inlong_group_id = #{groupId, jdbcType=VARCHAR}
+        and inlong_stream_id = #{streamId, jdbcType=VARCHAR}
+    </update>
+
     <update id="updateByPrimaryKeySelective" parameterType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
         update stream_source
         <set>
@@ -351,7 +364,7 @@
             creator             = #{creator,jdbcType=VARCHAR},
             modifier            = #{modifier,jdbcType=VARCHAR}
         where id = #{id,jdbcType=INTEGER}
-        and version = #{version,jdbcType=INTEGER}
+          and version = #{version,jdbcType=INTEGER}
     </update>
     <update id="updateStatus">
         update stream_source
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceFieldEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceFieldEntityMapper.xml
index f4ba00366..43cbc1187 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceFieldEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceFieldEntityMapper.xml
@@ -48,11 +48,7 @@
         where source_id = #{sourceId,jdbcType=INTEGER}
         and is_deleted = 0
     </select>
-    <delete id="deleteByPrimaryKey" parameterType="java.lang.Integer">
-        delete
-        from stream_source_field
-        where id = #{id,jdbcType=INTEGER}
-    </delete>
+
     <insert id="insert" useGeneratedKeys="true" keyProperty="id"
             parameterType="org.apache.inlong.manager.dao.entity.StreamSourceFieldEntity">
         insert into stream_source_field (id, inlong_group_id, inlong_stream_id,
@@ -231,6 +227,15 @@
             is_deleted       = #{isDeleted,jdbcType=INTEGER}
         where id = #{id,jdbcType=INTEGER}
     </update>
+    <update id="updateByRelatedId">
+        update stream_source_field
+        <set>
+            is_deleted = id
+        </set>
+        where is_deleted = 0
+        and inlong_group_id = #{groupId, jdbcType=VARCHAR}
+        and inlong_stream_id = #{streamId, jdbcType=VARCHAR}
+    </update>
 
     <insert id="insertAll" parameterType="org.apache.inlong.manager.dao.entity.StreamSourceFieldEntity">
         insert into stream_source_field (id, inlong_group_id,
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java
index f0940cecb..8037c2994 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java
@@ -116,6 +116,16 @@ public interface StreamSourceService {
      */
     Boolean delete(Integer id, String operator);
 
+    /**
+     * Force deletes the stream source by groupId and streamId
+     *
+     * @param groupId The belongs group id.
+     * @param streamId The belongs stream id.
+     * @param operator Operator's name
+     * @return Whether succeed
+     */
+    Boolean forceDelete(String groupId, String streamId, String operator);
+
     /**
      * Delete the stream source by the given id and source type.
      *
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
index 2f2214ce1..f3782a7cd 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
@@ -267,6 +267,20 @@ public class StreamSourceServiceImpl implements StreamSourceService {
         return true;
     }
 
+    @Override
+    public Boolean forceDelete(String groupId, String streamId, String operator) {
+        LOGGER.info("begin to force delete source for groupId={} and streamId={} by user={}",
+                groupId, streamId, operator);
+        Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
+        Preconditions.checkNotNull(streamId, ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage());
+
+        int sourceCount = sourceMapper.updateByRelatedId(groupId, streamId, SourceStatus.SOURCE_DISABLE.getCode());
+        int fieldCount = sourceFieldMapper.updateByRelatedId(groupId, streamId);
+        LOGGER.info("success to force delete source for groupId={} and streamId={} by user={},"
+                        + " update {} sources and {} fields", groupId, streamId, operator, sourceCount, fieldCount);
+        return true;
+    }
+
     @Override
     @Transactional(rollbackFor = Throwable.class, propagation = Propagation.REQUIRES_NEW,
             isolation = Isolation.READ_COMMITTED)
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java
index acfdbd1ca..766fd9779 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java
@@ -19,6 +19,7 @@ package org.apache.inlong.manager.web.controller;
 
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiImplicitParam;
+import io.swagger.annotations.ApiImplicitParams;
 import io.swagger.annotations.ApiOperation;
 import org.apache.inlong.manager.common.enums.OperationType;
 import org.apache.inlong.manager.common.validation.UpdateValidation;
@@ -36,6 +37,7 @@ import org.springframework.web.bind.annotation.PathVariable;
 import org.springframework.web.bind.annotation.RequestBody;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.RequestParam;
 import org.springframework.web.bind.annotation.RestController;
 
 /**
@@ -85,4 +87,15 @@ public class StreamSourceController {
         return Response.success(result);
     }
 
-}
\ No newline at end of file
+    @RequestMapping(value = "/source/forceDelete", method = RequestMethod.DELETE)
+    @OperationLog(operation = OperationType.DELETE)
+    @ApiOperation(value = "Force delete stream source by groupId and streamId")
+    @ApiImplicitParams({
+            @ApiImplicitParam(name = "groupId", dataTypeClass = String.class, required = true),
+            @ApiImplicitParam(name = "streamId", dataTypeClass = String.class, required = true)
+    })
+    public Response<Boolean> forceDelete(@RequestParam String groupId, @RequestParam String streamId) {
+        return Response.success(sourceService.forceDelete(groupId, streamId, LoginUserUtils.getLoginUser().getName()));
+    }
+
+}


[inlong] 02/04: [INLONG-6493][Dashboard] After the Stream is configured successfully, some parameters cannot be modified (#6494)

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.4
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 80ae93733927d1265947e264578076d644439971
Author: Daniel <le...@apache.org>
AuthorDate: Thu Nov 10 15:10:01 2022 +0800

    [INLONG-6493][Dashboard] After the Stream is configured successfully, some parameters cannot be modified (#6494)
---
 .../src/metas/streams/common/StreamDefaultInfo.ts  | 35 ++++++++++++++--------
 .../GroupDetail/DataStream/StreamItemModal.tsx     | 14 ++-------
 2 files changed, 25 insertions(+), 24 deletions(-)

diff --git a/inlong-dashboard/src/metas/streams/common/StreamDefaultInfo.ts b/inlong-dashboard/src/metas/streams/common/StreamDefaultInfo.ts
index 773a51b19..fd39a8cc9 100644
--- a/inlong-dashboard/src/metas/streams/common/StreamDefaultInfo.ts
+++ b/inlong-dashboard/src/metas/streams/common/StreamDefaultInfo.ts
@@ -38,9 +38,10 @@ export class StreamDefaultInfo implements DataWithBackend, RenderRow, RenderList
 
   @FieldDecorator({
     type: 'input',
-    props: {
-      maxLength: 32,
-    },
+    props: values => ({
+      disabled: Boolean(values?.status),
+      maxLength: 64,
+    }),
     rules: [
       { required: true },
       {
@@ -97,7 +98,8 @@ export class StreamDefaultInfo implements DataWithBackend, RenderRow, RenderList
     type: 'radio',
     initialValue: 'CSV',
     tooltip: i18n.t('meta.Stream.DataTypeHelp'),
-    props: {
+    props: values => ({
+      disabled: [110, 130].includes(values?.status),
       options: [
         {
           label: 'CSV',
@@ -108,7 +110,7 @@ export class StreamDefaultInfo implements DataWithBackend, RenderRow, RenderList
           value: 'RAW_CSV',
         },
       ],
-    },
+    }),
     rules: [{ required: true }],
   })
   @I18n('meta.Stream.DataType')
@@ -117,7 +119,8 @@ export class StreamDefaultInfo implements DataWithBackend, RenderRow, RenderList
   @FieldDecorator({
     type: 'radio',
     initialValue: 'UTF-8',
-    props: {
+    props: values => ({
+      disabled: [110, 130].includes(values?.status),
       options: [
         {
           label: 'UTF-8',
@@ -128,7 +131,7 @@ export class StreamDefaultInfo implements DataWithBackend, RenderRow, RenderList
           value: 'GBK',
         },
       ],
-    },
+    }),
     rules: [{ required: true }],
   })
   @I18n('meta.Stream.DataEncoding')
@@ -137,7 +140,8 @@ export class StreamDefaultInfo implements DataWithBackend, RenderRow, RenderList
   @FieldDecorator({
     type: 'select',
     initialValue: '124',
-    props: {
+    props: values => ({
+      disabled: [110, 130].includes(values?.status),
       dropdownMatchSelectWidth: false,
       options: [
         {
@@ -170,7 +174,7 @@ export class StreamDefaultInfo implements DataWithBackend, RenderRow, RenderList
         placeholder: 'ASCII',
       },
       style: { width: 100 },
-    },
+    }),
     rules: [
       {
         required: true,
@@ -186,8 +190,9 @@ export class StreamDefaultInfo implements DataWithBackend, RenderRow, RenderList
 
   @FieldDecorator({
     type: EditableTable,
-    props: {
+    props: values => ({
       size: 'small',
+      canDelete: record => !(record.id && [110, 130].includes(values?.status)),
       columns: [
         {
           title: i18n.t('meta.Stream.FieldName'),
@@ -199,15 +204,19 @@ export class StreamDefaultInfo implements DataWithBackend, RenderRow, RenderList
               message: i18n.t('meta.Stream.FieldNameRule'),
             },
           ],
+          props: (text, record) => ({
+            disabled: record.id && [110, 130].includes(values?.status),
+          }),
         },
         {
           title: i18n.t('meta.Stream.FieldType'),
           dataIndex: 'fieldType',
           type: 'select',
           initialValue: sourceFieldsTypes[0].value,
-          props: {
+          props: (text, record) => ({
+            disabled: record.id && [110, 130].includes(values?.status),
             options: sourceFieldsTypes,
-          },
+          }),
           rules: [{ required: true }],
         },
         {
@@ -215,7 +224,7 @@ export class StreamDefaultInfo implements DataWithBackend, RenderRow, RenderList
           dataIndex: 'fieldComment',
         },
       ],
-    },
+    }),
   })
   @I18n('meta.Stream.SourceDataField')
   rowTypeFields: Record<string, string>[];
diff --git a/inlong-dashboard/src/pages/GroupDetail/DataStream/StreamItemModal.tsx b/inlong-dashboard/src/pages/GroupDetail/DataStream/StreamItemModal.tsx
index 08c4af720..2c78e0e31 100644
--- a/inlong-dashboard/src/pages/GroupDetail/DataStream/StreamItemModal.tsx
+++ b/inlong-dashboard/src/pages/GroupDetail/DataStream/StreamItemModal.tsx
@@ -96,17 +96,8 @@ const Comp: React.FC<Props> = ({ inlongGroupId, inlongStreamId, mqType, ...modal
         },
         visible: mqType === 'PULSAR',
       },
-    ].map(item => {
-      const obj = { ...item };
-      const isCreate = !inlongStreamId;
-
-      if (!isCreate && (obj.name === 'inlongStreamId' || obj.name === 'dataType')) {
-        obj.type = 'text';
-      }
-
-      return obj;
-    });
-  }, [entityFields, mqType, inlongStreamId]);
+    ];
+  }, [entityFields, mqType]);
 
   const { data: savedData, run: getStreamData } = useRequest(
     {
@@ -160,6 +151,7 @@ const Comp: React.FC<Props> = ({ inlongGroupId, inlongStreamId, mqType, ...modal
       <FormGenerator
         labelCol={{ span: 4 }}
         wrapperCol={{ span: 20 }}
+        initialValues={inlongStreamId ? savedData : {}}
         content={formContent}
         form={form}
         useMaxWidth


[inlong] 01/04: [INLONG-6044][Manager] Distinguish config processes between the InlongGroup and InlongStream (#6046)

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.4
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit b14dbfa7b74e82275cd9178dcb5973aa01469550
Author: fuweng11 <76...@users.noreply.github.com>
AuthorDate: Thu Nov 10 14:46:44 2022 +0800

    [INLONG-6044][Manager] Distinguish config processes between the InlongGroup and InlongStream (#6046)
    
    * Distinguish config processes between the InlongGroup and InlongStream
    
    * Use common thread pool for UpdateGroupCompleteListener
    
    * Modify the source state according to lightweight
    
    * Modify the asynchronous process logic of inlong stream
    
    * Allow configuration without inlong stream
    
    * Determine whether to configure Sort according to isStream param
    
    Co-authored-by: healchow <he...@gmail.com>
---
 .../manager/dao/entity/WorkflowProcessEntity.java  |  1 +
 .../mappers/WorkflowProcessEntityMapper.xml        | 27 +++++----
 .../manager/pojo/workflow/ProcessRequest.java      |  3 +
 .../manager/pojo/workflow/TaskLogRequest.java      |  3 +
 .../form/process/StreamResourceProcessForm.java    | 13 +++++
 .../listener/group/InitGroupCompleteListener.java  | 24 +++-----
 .../service/listener/group/InitGroupListener.java  |  4 --
 .../listener/queue/QueueResourceListener.java      | 67 ++++++++++++++++++++++
 .../queue/StreamQueueResourceListener.java         |  3 -
 .../service/listener/sort/SortConfigListener.java  |  5 ++
 .../stream/InitStreamCompleteListener.java         |  7 ++-
 .../queue/kafka/KafkaResourceOperators.java        | 25 +-------
 .../queue/pulsar/PulsarResourceOperator.java       | 16 +-----
 .../resource/sort/DefaultSortConfigOperator.java   | 10 ++--
 .../service/stream/InlongStreamProcessService.java | 20 +++----
 .../service/workflow/WorkflowServiceImpl.java      |  1 +
 .../group/CreateGroupWorkflowDefinition.java       | 24 +-------
 .../group/DeleteGroupWorkflowDefinition.java       |  8 ++-
 .../stream/CreateStreamWorkflowDefinition.java     |  4 +-
 .../stream/DeleteStreamWorkflowDefinition.java     |  4 +-
 .../group/CreateGroupWorkflowDefinitionTest.java   |  4 +-
 .../main/resources/h2/apache_inlong_manager.sql    |  3 +-
 .../manager-web/sql/apache_inlong_manager.sql      |  3 +-
 .../workflow/processor/StartEventProcessor.java    |  5 ++
 24 files changed, 162 insertions(+), 122 deletions(-)

diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/WorkflowProcessEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/WorkflowProcessEntity.java
index 87e92bf4a..b295e1e40 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/WorkflowProcessEntity.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/WorkflowProcessEntity.java
@@ -34,6 +34,7 @@ public class WorkflowProcessEntity {
     private String title;
 
     private String inlongGroupId;
+    private String inlongStreamId;
     private String applicant;
     private String status;
     private String formData;
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/WorkflowProcessEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/WorkflowProcessEntityMapper.xml
index 1969918b4..8b122ab12 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/WorkflowProcessEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/WorkflowProcessEntityMapper.xml
@@ -27,6 +27,7 @@
         <result column="type" jdbcType="VARCHAR" property="type"/>
         <result column="title" jdbcType="VARCHAR" property="title"/>
         <result column="inlong_group_id" jdbcType="VARCHAR" property="inlongGroupId"/>
+        <result column="inlong_stream_id" jdbcType="VARCHAR" property="inlongStreamId"/>
         <result column="applicant" jdbcType="VARCHAR" property="applicant"/>
         <result column="status" jdbcType="VARCHAR" property="status"/>
         <result column="start_time" jdbcType="TIMESTAMP" property="startTime"/>
@@ -36,22 +37,21 @@
         <result column="ext_params" jdbcType="LONGVARCHAR" property="extParams"/>
     </resultMap>
     <sql id="Base_Column_List">
-        id, name, display_name, type, title, inlong_group_id, applicant,
-        status, start_time, end_time, hidden, form_data, ext_params
+        id, name, display_name, type, title, inlong_group_id, inlong_stream_id,
+        applicant, status, start_time, end_time, hidden, form_data, ext_params
     </sql>
 
     <insert id="insert" useGeneratedKeys="true" keyProperty="id" keyColumn="id"
             parameterType="org.apache.inlong.manager.dao.entity.WorkflowProcessEntity">
-        insert into workflow_process (name, display_name,
-                                      type, title, inlong_group_id,
-                                      applicant, status,
-                                      start_time, end_time,
-                                      form_data, ext_params, hidden)
-        values (#{name,jdbcType=VARCHAR}, #{displayName,jdbcType=VARCHAR},
-                #{type,jdbcType=VARCHAR}, #{title,jdbcType=VARCHAR}, #{inlongGroupId,jdbcType=VARCHAR},
-                #{applicant,jdbcType=VARCHAR}, #{status,jdbcType=VARCHAR},
-                #{startTime,jdbcType=TIMESTAMP}, #{endTime,jdbcType=TIMESTAMP},
-                #{formData,jdbcType=LONGVARCHAR}, #{extParams,jdbcType=LONGVARCHAR}, #{hidden,jdbcType=TINYINT})
+        insert into workflow_process (name, display_name, type,
+                                      title, inlong_group_id, inlong_stream_id,
+                                      applicant, status, start_time,
+                                      end_time, form_data, ext_params, hidden)
+        values (#{name,jdbcType=VARCHAR}, #{displayName,jdbcType=VARCHAR}, #{type,jdbcType=VARCHAR},
+                #{title,jdbcType=VARCHAR}, #{inlongGroupId,jdbcType=VARCHAR}, #{inlongStreamId,jdbcType=VARCHAR},
+                #{applicant,jdbcType=VARCHAR}, #{status,jdbcType=VARCHAR}, #{startTime,jdbcType=TIMESTAMP},
+                #{endTime,jdbcType=TIMESTAMP}, #{formData,jdbcType=LONGVARCHAR}, #{extParams,jdbcType=LONGVARCHAR},
+                #{hidden,jdbcType=TINYINT})
     </insert>
 
     <select id="selectById" parameterType="java.lang.Integer" resultMap="BaseResultMap">
@@ -87,6 +87,9 @@
             <if test="inlongGroupId != null and inlongGroupId !=''">
                 and inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR}
             </if>
+            <if test="inlongStreamId != null and inlongStreamId !=''">
+                and inlong_stream_id = #{inlongStreamId,jdbcType=VARCHAR}
+            </if>
             <if test="applicant != null and applicant !=''">
                 and applicant = #{applicant,jdbcType=VARCHAR}
             </if>
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/ProcessRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/ProcessRequest.java
index a9b5138e8..585de747a 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/ProcessRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/ProcessRequest.java
@@ -63,6 +63,9 @@ public class ProcessRequest extends PageRequest {
     @ApiModelProperty("Inlong group id")
     private String inlongGroupId;
 
+    @ApiModelProperty("Inlong stream id")
+    private String inlongStreamId;
+
     @ApiModelProperty("Start time-lower limit")
     @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
     private Date startTimeBegin;
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/TaskLogRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/TaskLogRequest.java
index 5f5011918..f3cc5a9b2 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/TaskLogRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/TaskLogRequest.java
@@ -36,6 +36,9 @@ public class TaskLogRequest extends PageRequest {
     @ApiModelProperty("Inlong group id")
     private String inlongGroupId;
 
+    @ApiModelProperty("Inlong stream id")
+    private String inlongStreamId;
+
     @ApiModelProperty("Process name list")
     private List<String> processNames;
 
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/StreamResourceProcessForm.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/StreamResourceProcessForm.java
index 4c50e58e4..b175bc09d 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/StreamResourceProcessForm.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/StreamResourceProcessForm.java
@@ -39,6 +39,18 @@ public class StreamResourceProcessForm extends BaseProcessForm {
 
     private GroupOperateType groupOperateType = GroupOperateType.INIT;
 
+    /**
+     * Get stream resource process form info.
+     */
+    public static StreamResourceProcessForm getProcessForm(InlongGroupInfo groupInfo, InlongStreamInfo streamInfo,
+            GroupOperateType operateType) {
+        StreamResourceProcessForm processForm = new StreamResourceProcessForm();
+        processForm.setGroupInfo(groupInfo);
+        processForm.setStreamInfo(streamInfo);
+        processForm.setGroupOperateType(operateType);
+        return processForm;
+    }
+
     @Override
     public void validate() throws FormValidateException {
 
@@ -53,4 +65,5 @@ public class StreamResourceProcessForm extends BaseProcessForm {
     public String getInlongGroupId() {
         return groupInfo.getInlongGroupId();
     }
+
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupCompleteListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupCompleteListener.java
index 944c740c7..ca5a375fb 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupCompleteListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupCompleteListener.java
@@ -18,27 +18,26 @@
 package org.apache.inlong.manager.service.listener.group;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.GroupStatus;
 import org.apache.inlong.manager.common.enums.ProcessEvent;
-import org.apache.inlong.manager.common.enums.SourceStatus;
-import org.apache.inlong.manager.common.enums.StreamStatus;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
 import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
 import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.group.InlongGroupRequest;
 import org.apache.inlong.manager.pojo.group.InlongGroupUtils;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
 import org.apache.inlong.manager.service.group.InlongGroupService;
-import org.apache.inlong.manager.service.source.StreamSourceService;
-import org.apache.inlong.manager.service.stream.InlongStreamService;
+import org.apache.inlong.manager.service.stream.InlongStreamProcessService;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
 import org.apache.inlong.manager.workflow.event.process.ProcessEventListener;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
+import java.util.List;
+
 /**
  * The listener of InlongGroup when created resources successfully.
  */
@@ -49,11 +48,9 @@ public class InitGroupCompleteListener implements ProcessEventListener {
     @Autowired
     private InlongGroupService groupService;
     @Autowired
-    private InlongStreamService streamService;
-    @Autowired
-    private StreamSourceService sourceService;
-    @Autowired
     private InlongGroupEntityMapper groupMapper;
+    @Autowired
+    private InlongStreamProcessService streamProcessService;
 
     @Override
     public ProcessEvent event() {
@@ -84,12 +81,9 @@ public class InitGroupCompleteListener implements ProcessEventListener {
         updateGroupRequest.setVersion(existGroup.getVersion());
         groupService.update(updateGroupRequest, operator);
 
-        // update status of other related configs
-        streamService.updateStatus(groupId, null, StreamStatus.CONFIG_SUCCESSFUL.getCode(), operator);
-        if (InlongConstants.LIGHTWEIGHT_MODE.equals(groupInfo.getLightweight())) {
-            sourceService.updateStatus(groupId, null, SourceStatus.SOURCE_NORMAL.getCode(), operator);
-        } else {
-            sourceService.updateStatus(groupId, null, SourceStatus.TO_BE_ISSUED_ADD.getCode(), operator);
+        List<InlongStreamInfo> streamList = form.getStreamInfos();
+        for (InlongStreamInfo streamInfo : streamList) {
+            streamProcessService.startProcess(groupId, streamInfo.getInlongStreamId(), operator, false);
         }
 
         log.info("success to execute InitGroupCompleteListener for groupId={}", groupId);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupListener.java
index d5c238fa8..fbef1c864 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupListener.java
@@ -18,7 +18,6 @@
 package org.apache.inlong.manager.service.listener.group;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.collections.CollectionUtils;
 import org.apache.inlong.manager.common.enums.GroupStatus;
 import org.apache.inlong.manager.common.enums.ProcessEvent;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
@@ -59,9 +58,6 @@ public class InitGroupListener implements ProcessEventListener {
         if (groupInfo == null) {
             throw new WorkflowListenerException("inlong group info cannot be null for init group process");
         }
-        if (CollectionUtils.isEmpty(form.getStreamInfos())) {
-            throw new WorkflowListenerException("inlong stream info list cannot be null for init group process");
-        }
         groupService.updateStatus(groupId, GroupStatus.CONFIG_ING.getCode(), context.getOperator());
 
         log.info("success to execute InitGroupListener for groupId={}", groupId);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java
index edba69c55..390c11b48 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java
@@ -17,22 +17,40 @@
 
 package org.apache.inlong.manager.service.listener.queue;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.GroupOperateType;
 import org.apache.inlong.manager.common.enums.TaskEvent;
+import org.apache.inlong.manager.common.enums.TaskStatus;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.pojo.workflow.TaskResponse;
+import org.apache.inlong.manager.pojo.workflow.WorkflowResult;
 import org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
+import org.apache.inlong.manager.pojo.workflow.form.process.StreamResourceProcessForm;
 import org.apache.inlong.manager.service.group.InlongGroupService;
 import org.apache.inlong.manager.service.resource.queue.QueueResourceOperator;
 import org.apache.inlong.manager.service.resource.queue.QueueResourceOperatorFactory;
+import org.apache.inlong.manager.service.workflow.WorkflowService;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
 import org.apache.inlong.manager.workflow.event.task.QueueOperateListener;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.inlong.manager.common.enums.GroupOperateType.INIT;
+import static org.apache.inlong.manager.common.enums.ProcessName.CREATE_STREAM_RESOURCE;
+
 /**
  * Create message queue resources,
  * such as Pulsar Topic and Subscription, TubeMQ Topic and ConsumerGroup, etc.
@@ -41,10 +59,21 @@ import org.springframework.stereotype.Service;
 @Service
 public class QueueResourceListener implements QueueOperateListener {
 
+    private static final ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor(
+            20,
+            40,
+            10L,
+            TimeUnit.SECONDS,
+            new LinkedBlockingQueue<>(),
+            new ThreadFactoryBuilder().setNameFormat("inlong-stream-process-%s").build(),
+            new CallerRunsPolicy());
+
     @Autowired
     private InlongGroupService groupService;
     @Autowired
     private QueueResourceOperatorFactory queueOperatorFactory;
+    @Autowired
+    private WorkflowService workflowService;
 
     @Override
     public TaskEvent event() {
@@ -82,7 +111,10 @@ public class QueueResourceListener implements QueueOperateListener {
         String operator = context.getOperator();
         switch (operateType) {
             case INIT:
+                // create queue resource for inlong group
                 queueOperator.createQueueForGroup(groupInfo, operator);
+                // create queue resource for all inlong streams under the inlong group
+                this.createQueueForStreams(groupInfo, groupProcessForm.getStreamInfos(), operator);
                 break;
             case DELETE:
                 queueOperator.deleteQueueForGroup(groupInfo, operator);
@@ -96,4 +128,39 @@ public class QueueResourceListener implements QueueOperateListener {
         return ListenerResult.success("success");
     }
 
+    private void createQueueForStreams(InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfos, String operator) {
+        String groupId = groupInfo.getInlongGroupId();
+        log.info("success to start stream process for groupId={}", groupId);
+
+        for (InlongStreamInfo stream : streamInfos) {
+            StreamResourceProcessForm form = StreamResourceProcessForm.getProcessForm(groupInfo, stream, INIT);
+            String streamId = stream.getInlongStreamId();
+            final String errMsg = "failed to start stream process for groupId=" + groupId + " streamId=" + streamId;
+
+            CompletableFuture<WorkflowResult> future = CompletableFuture
+                    .supplyAsync(() -> workflowService.start(CREATE_STREAM_RESOURCE, operator, form), EXECUTOR_SERVICE)
+                    .whenComplete((result, ex) -> {
+                        if (ex != null) {
+                            log.error(errMsg + ": " + ex.getMessage());
+                            throw new WorkflowListenerException(errMsg, ex);
+                        } else {
+                            List<TaskResponse> tasks = result.getNewTasks();
+                            if (TaskStatus.FAILED == tasks.get(tasks.size() - 1).getStatus()) {
+                                log.error(errMsg);
+                                throw new WorkflowListenerException(errMsg);
+                            }
+                        }
+                    });
+            try {
+                future.get(180, TimeUnit.SECONDS);
+            } catch (Exception e) {
+                String msg = "failed to execute stream process in asynchronously ";
+                log.error(msg, e);
+                throw new WorkflowListenerException(msg + ": " + e.getMessage());
+            }
+        }
+
+        log.info("success to start stream process for groupId={}", groupId);
+    }
+
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/StreamQueueResourceListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/StreamQueueResourceListener.java
index 21403ec21..eb3aaab8c 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/StreamQueueResourceListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/StreamQueueResourceListener.java
@@ -29,7 +29,6 @@ import org.apache.inlong.manager.pojo.workflow.form.process.StreamResourceProces
 import org.apache.inlong.manager.service.group.InlongGroupService;
 import org.apache.inlong.manager.service.resource.queue.QueueResourceOperator;
 import org.apache.inlong.manager.service.resource.queue.QueueResourceOperatorFactory;
-import org.apache.inlong.manager.service.stream.InlongStreamService;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
 import org.apache.inlong.manager.workflow.event.task.QueueOperateListener;
@@ -46,8 +45,6 @@ public class StreamQueueResourceListener implements QueueOperateListener {
     @Autowired
     private InlongGroupService groupService;
     @Autowired
-    private InlongStreamService streamService;
-    @Autowired
     private QueueResourceOperatorFactory queueOperatorFactory;
 
     @Override
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java
index d04a20852..08a310ddb 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.manager.service.listener.sort;
 
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.inlong.manager.common.enums.GroupOperateType;
 import org.apache.inlong.manager.common.enums.TaskEvent;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
@@ -81,6 +82,10 @@ public class SortConfigListener implements SortOperateListener {
         }
         InlongGroupInfo groupInfo = form.getGroupInfo();
         List<InlongStreamInfo> streamInfos = form.getStreamInfos();
+        if (CollectionUtils.isEmpty(streamInfos)) {
+            LOGGER.warn("do not build sort config for groupId={}, as the stream is empty", groupId);
+            return ListenerResult.success();
+        }
         int sinkCount = streamInfos.stream()
                 .map(stream -> stream.getSinkList() == null ? 0 : stream.getSinkList().size())
                 .reduce(0, Integer::sum);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/stream/InitStreamCompleteListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/stream/InitStreamCompleteListener.java
index be6e60fd6..141635c46 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/stream/InitStreamCompleteListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/stream/InitStreamCompleteListener.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.manager.service.listener.stream;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.ProcessEvent;
 import org.apache.inlong.manager.common.enums.SourceStatus;
 import org.apache.inlong.manager.common.enums.StreamStatus;
@@ -63,7 +64,11 @@ public class InitStreamCompleteListener implements ProcessEventListener {
         // Update status of other related configs
         streamService.updateStatus(groupId, streamId, StreamStatus.CONFIG_SUCCESSFUL.getCode(), operator);
         streamService.update(streamInfo.genRequest(), operator);
-        sourceService.updateStatus(groupId, streamId, SourceStatus.TO_BE_ISSUED_ADD.getCode(), operator);
+        if (InlongConstants.LIGHTWEIGHT_MODE.equals(form.getGroupInfo().getLightweight())) {
+            sourceService.updateStatus(groupId, streamId, SourceStatus.SOURCE_NORMAL.getCode(), operator);
+        } else {
+            sourceService.updateStatus(groupId, streamId, SourceStatus.TO_BE_ISSUED_ADD.getCode(), operator);
+        }
 
         return ListenerResult.success();
     }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java
index 4e2b7246a..2f65aa735 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java
@@ -54,10 +54,10 @@ public class KafkaResourceOperators implements QueueResourceOperator {
     @Autowired
     private InlongClusterService clusterService;
     @Autowired
-    private InlongStreamService streamService;
-    @Autowired
     private KafkaOperator kafkaOperator;
     @Autowired
+    private InlongStreamService streamService;
+    @Autowired
     private InlongConsumeService consumeService;
 
     @Override
@@ -67,26 +67,7 @@ public class KafkaResourceOperators implements QueueResourceOperator {
 
     @Override
     public void createQueueForGroup(@NotNull InlongGroupInfo groupInfo, @NotBlank String operator) {
-        String groupId = groupInfo.getInlongGroupId();
-        log.info("begin to create kafka resource for groupId={}", groupId);
-
-        InlongKafkaInfo inlongKafkaInfo = (InlongKafkaInfo) groupInfo;
-        try {
-            // 1. create kafka Topic - each Inlong Stream corresponds to a Kafka Topic
-            List<InlongStreamBriefInfo> streamInfoList = streamService.getTopicList(groupId);
-            if (streamInfoList == null || streamInfoList.isEmpty()) {
-                log.warn("skip to create kafka topic and subscription as no streams for groupId={}", groupId);
-                return;
-            }
-            for (InlongStreamBriefInfo streamInfo : streamInfoList) {
-                this.createKafkaTopic(inlongKafkaInfo, streamInfo.getMqResource());
-            }
-        } catch (Exception e) {
-            String msg = String.format("failed to create kafka resource for groupId=%s", groupId);
-            log.error(msg, e);
-            throw new WorkflowListenerException(msg + ": " + e.getMessage());
-        }
-        log.info("success to create kafka resource for groupId={}, cluster={}", groupId, inlongKafkaInfo);
+        log.info("skip to create kafka topic for groupId={}", groupInfo.getInlongGroupId());
     }
 
     @Override
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java
index 23c13b740..2e379b285 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java
@@ -107,20 +107,6 @@ public class PulsarResourceOperator implements QueueResourceOperator {
                     log.info("success to create pulsar namespace for groupId={}, namespace={}, cluster={}",
                             groupId, namespace, clusterName);
                 }
-
-                // create pulsar topic - each Inlong Stream corresponds to a Pulsar topic
-                List<InlongStreamBriefInfo> streamInfoList = streamService.getTopicList(groupId);
-                if (streamInfoList == null || streamInfoList.isEmpty()) {
-                    log.warn("skip to create pulsar topic and subscription as no streams for groupId={}, cluster={}",
-                            groupId, clusterName);
-                    return;
-                }
-                // create pulsar topic and subscription
-                for (InlongStreamBriefInfo stream : streamInfoList) {
-                    this.createTopic(pulsarInfo, pulsarCluster, stream.getMqResource());
-                    this.createSubscription(pulsarInfo, pulsarCluster, stream.getMqResource(),
-                            stream.getInlongStreamId());
-                }
             } catch (Exception e) {
                 String msg = String.format("failed to create pulsar resource for groupId=%s, cluster=%s", groupId,
                         pulsarCluster.toString());
@@ -184,7 +170,7 @@ public class PulsarResourceOperator implements QueueResourceOperator {
                         streamInfo.getMqResource(), streamId);
             } catch (Exception e) {
                 String msg = String.format("failed to create pulsar topic for groupId=%s, streamId=%s, cluster=%s",
-                        groupId, streamId,pulsarCluster.getName());
+                        groupId, streamId, pulsarCluster.getName());
                 log.error(msg, e);
                 throw new WorkflowListenerException(msg + ": " + e.getMessage());
             }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
index a4c6b5d80..2449b835e 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
@@ -77,6 +77,10 @@ public class DefaultSortConfigOperator implements SortConfigOperator {
     @Override
     public void buildConfig(InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfos, boolean isStream)
             throws Exception {
+        if (isStream) {
+            LOGGER.warn("stream workflow no need to build sort config for disable zk");
+            return;
+        }
         if (groupInfo == null || CollectionUtils.isEmpty(streamInfos)) {
             LOGGER.warn("group info is null or stream infos is empty, no need to build sort config for disable zk");
             return;
@@ -84,11 +88,7 @@ public class DefaultSortConfigOperator implements SortConfigOperator {
 
         GroupInfo configInfo = this.getGroupInfo(groupInfo, streamInfos);
         String dataflow = OBJECT_MAPPER.writeValueAsString(configInfo);
-        if (isStream) {
-            this.addToStreamExt(streamInfos, dataflow);
-        } else {
-            this.addToGroupExt(groupInfo, dataflow);
-        }
+        this.addToGroupExt(groupInfo, dataflow);
 
         if (LOGGER.isDebugEnabled()) {
             LOGGER.debug("success to build sort config, isStream={}, dataflow={}", isStream, dataflow);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamProcessService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamProcessService.java
index 06046eceb..eaa05ba41 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamProcessService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamProcessService.java
@@ -92,7 +92,8 @@ public class InlongStreamProcessService {
             throw new BusinessException(errMsg);
         }
 
-        StreamResourceProcessForm processForm = genStreamProcessForm(groupInfo, streamInfo, GroupOperateType.INIT);
+        StreamResourceProcessForm processForm = StreamResourceProcessForm.getProcessForm(groupInfo,
+                streamInfo, GroupOperateType.INIT);
         ProcessName processName = ProcessName.CREATE_STREAM_RESOURCE;
         if (sync) {
             WorkflowResult workflowResult = workflowService.start(processName, operator, processForm);
@@ -134,7 +135,8 @@ public class InlongStreamProcessService {
                     String.format("stream status=%s not support suspend stream for groupId=%s streamId=%s",
                             status, groupId, streamId));
         }
-        StreamResourceProcessForm processForm = genStreamProcessForm(groupInfo, streamInfo, GroupOperateType.SUSPEND);
+        StreamResourceProcessForm processForm = StreamResourceProcessForm.getProcessForm(groupInfo, streamInfo,
+                GroupOperateType.SUSPEND);
         ProcessName processName = ProcessName.SUSPEND_STREAM_RESOURCE;
         if (sync) {
             WorkflowResult workflowResult = workflowService.start(processName, operator, processForm);
@@ -175,7 +177,8 @@ public class InlongStreamProcessService {
                     String.format("stream status=%s not support restart stream for groupId=%s streamId=%s",
                             status, groupId, streamId));
         }
-        StreamResourceProcessForm processForm = genStreamProcessForm(groupInfo, streamInfo, GroupOperateType.RESTART);
+        StreamResourceProcessForm processForm = StreamResourceProcessForm.getProcessForm(groupInfo, streamInfo,
+                GroupOperateType.RESTART);
         ProcessName processName = ProcessName.RESTART_STREAM_RESOURCE;
         if (sync) {
             WorkflowResult workflowResult = workflowService.start(processName, operator, processForm);
@@ -220,7 +223,8 @@ public class InlongStreamProcessService {
                     String.format("stream status=%s not support delete stream for groupId=%s streamId=%s",
                             status, groupId, streamId));
         }
-        StreamResourceProcessForm processForm = genStreamProcessForm(groupInfo, streamInfo, GroupOperateType.DELETE);
+        StreamResourceProcessForm processForm = StreamResourceProcessForm.getProcessForm(groupInfo, streamInfo,
+                GroupOperateType.DELETE);
         ProcessName processName = ProcessName.DELETE_STREAM_RESOURCE;
         if (sync) {
             WorkflowResult workflowResult = workflowService.start(processName, operator, processForm);
@@ -242,12 +246,4 @@ public class InlongStreamProcessService {
         }
     }
 
-    private StreamResourceProcessForm genStreamProcessForm(InlongGroupInfo groupInfo, InlongStreamInfo streamInfo,
-            GroupOperateType operateType) {
-        StreamResourceProcessForm processForm = new StreamResourceProcessForm();
-        processForm.setGroupInfo(groupInfo);
-        processForm.setStreamInfo(streamInfo);
-        processForm.setGroupOperateType(operateType);
-        return processForm;
-    }
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImpl.java
index d3d6bac9a..b59842f41 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImpl.java
@@ -189,6 +189,7 @@ public class WorkflowServiceImpl implements WorkflowService {
 
         ProcessRequest processRequest = new ProcessRequest();
         processRequest.setInlongGroupId(groupId);
+        processRequest.setInlongStreamId(query.getInlongStreamId());
         processRequest.setNameList(processNameList);
         processRequest.setHidden(1);
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/CreateGroupWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/CreateGroupWorkflowDefinition.java
index 01d500bf0..94eb0c3fb 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/CreateGroupWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/CreateGroupWorkflowDefinition.java
@@ -77,14 +77,6 @@ public class CreateGroupWorkflowDefinition implements WorkflowDefinition {
         initMQTask.setListenerFactory(groupTaskListenerFactory);
         process.addTask(initMQTask);
 
-        // Init Sink
-        ServiceTask initSinkTask = new ServiceTask();
-        initSinkTask.setName("InitSink");
-        initSinkTask.setDisplayName("Group-InitSink");
-        initSinkTask.setServiceTaskType(ServiceTaskType.INIT_SINK);
-        initSinkTask.setListenerFactory(groupTaskListenerFactory);
-        process.addTask(initSinkTask);
-
         // Init Sort
         ServiceTask initSortTask = new ServiceTask();
         initSortTask.setName("InitSort");
@@ -93,25 +85,15 @@ public class CreateGroupWorkflowDefinition implements WorkflowDefinition {
         initSortTask.setListenerFactory(groupTaskListenerFactory);
         process.addTask(initSortTask);
 
-        // Init Source
-        ServiceTask initSourceTask = new ServiceTask();
-        initSourceTask.setName("InitSource");
-        initSourceTask.setDisplayName("Group-InitSource");
-        initSourceTask.setServiceTaskType(ServiceTaskType.INIT_SOURCE);
-        initSourceTask.setListenerFactory(groupTaskListenerFactory);
-        process.addTask(initSourceTask);
-
         // End node
         EndEvent endEvent = new EndEvent();
         process.setEndEvent(endEvent);
 
-        // Task dependency order: 1.MQ -> 2.Sink -> 3.Sort -> 4.Source
+        // Task dependency order: 1.MQ -> 2.Sink-in-Stream -> 3.Sort -> 4.Source-in-Stream
         // To ensure that after some tasks fail, data will not start to be collected by source or consumed by sort
         startEvent.addNext(initMQTask);
-        initMQTask.addNext(initSinkTask);
-        initSinkTask.addNext(initSortTask);
-        initSortTask.addNext(initSourceTask);
-        initSourceTask.addNext(endEvent);
+        initMQTask.addNext(initSortTask);
+        initSortTask.addNext(endEvent);
 
         return process;
     }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/DeleteGroupWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/DeleteGroupWorkflowDefinition.java
index 395a3dec9..5d3eb090e 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/DeleteGroupWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/DeleteGroupWorkflowDefinition.java
@@ -18,13 +18,13 @@
 package org.apache.inlong.manager.service.workflow.group;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
 import org.apache.inlong.manager.common.enums.ProcessName;
-import org.apache.inlong.manager.service.workflow.WorkflowDefinition;
+import org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
+import org.apache.inlong.manager.service.listener.GroupTaskListenerFactory;
 import org.apache.inlong.manager.service.listener.group.UpdateGroupCompleteListener;
 import org.apache.inlong.manager.service.listener.group.UpdateGroupFailedListener;
 import org.apache.inlong.manager.service.listener.group.UpdateGroupListener;
-import org.apache.inlong.manager.service.listener.GroupTaskListenerFactory;
+import org.apache.inlong.manager.service.workflow.WorkflowDefinition;
 import org.apache.inlong.manager.workflow.definition.EndEvent;
 import org.apache.inlong.manager.workflow.definition.ServiceTask;
 import org.apache.inlong.manager.workflow.definition.ServiceTaskType;
@@ -93,6 +93,8 @@ public class DeleteGroupWorkflowDefinition implements WorkflowDefinition {
         deleteSortTask.setListenerFactory(groupTaskListenerFactory);
         process.addTask(deleteSortTask);
 
+        // No need to delete the sink because we should not affect the existing data in the sink
+
         // End node
         EndEvent endEvent = new EndEvent();
         process.setEndEvent(endEvent);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java
index 79f154785..6ce7822b5 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java
@@ -18,13 +18,13 @@
 package org.apache.inlong.manager.service.workflow.stream;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.pojo.workflow.form.process.StreamResourceProcessForm;
 import org.apache.inlong.manager.common.enums.ProcessName;
-import org.apache.inlong.manager.service.workflow.WorkflowDefinition;
+import org.apache.inlong.manager.pojo.workflow.form.process.StreamResourceProcessForm;
 import org.apache.inlong.manager.service.listener.StreamTaskListenerFactory;
 import org.apache.inlong.manager.service.listener.stream.InitStreamCompleteListener;
 import org.apache.inlong.manager.service.listener.stream.InitStreamFailedListener;
 import org.apache.inlong.manager.service.listener.stream.InitStreamListener;
+import org.apache.inlong.manager.service.workflow.WorkflowDefinition;
 import org.apache.inlong.manager.workflow.definition.EndEvent;
 import org.apache.inlong.manager.workflow.definition.ServiceTask;
 import org.apache.inlong.manager.workflow.definition.ServiceTaskType;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/DeleteStreamWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/DeleteStreamWorkflowDefinition.java
index 7dd11ca4f..415d78f3f 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/DeleteStreamWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/DeleteStreamWorkflowDefinition.java
@@ -18,13 +18,13 @@
 package org.apache.inlong.manager.service.workflow.stream;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.pojo.workflow.form.process.StreamResourceProcessForm;
 import org.apache.inlong.manager.common.enums.ProcessName;
-import org.apache.inlong.manager.service.workflow.WorkflowDefinition;
+import org.apache.inlong.manager.pojo.workflow.form.process.StreamResourceProcessForm;
 import org.apache.inlong.manager.service.listener.StreamTaskListenerFactory;
 import org.apache.inlong.manager.service.listener.stream.UpdateStreamCompleteListener;
 import org.apache.inlong.manager.service.listener.stream.UpdateStreamFailedListener;
 import org.apache.inlong.manager.service.listener.stream.UpdateStreamListener;
+import org.apache.inlong.manager.service.workflow.WorkflowDefinition;
 import org.apache.inlong.manager.workflow.definition.EndEvent;
 import org.apache.inlong.manager.workflow.definition.ServiceTask;
 import org.apache.inlong.manager.workflow.definition.ServiceTaskType;
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/group/CreateGroupWorkflowDefinitionTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/group/CreateGroupWorkflowDefinitionTest.java
index 31c74a2cb..217d23841 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/group/CreateGroupWorkflowDefinitionTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/group/CreateGroupWorkflowDefinitionTest.java
@@ -37,11 +37,9 @@ public class CreateGroupWorkflowDefinitionTest extends ServiceBaseTest {
         WorkflowProcess cloneProcess1 = process.clone();
         WorkflowProcess cloneProcess2 = cloneProcess1.clone();
         Assertions.assertNotSame(cloneProcess2, cloneProcess1);
-        Assertions.assertNotNull(process.getTaskByName("InitSource"));
         Assertions.assertNotNull(process.getTaskByName("InitMQ"));
         Assertions.assertNotNull(process.getTaskByName("InitSort"));
-        Assertions.assertNotNull(process.getTaskByName("InitSink"));
-        Assertions.assertEquals(4, process.getNameToTaskMap().size());
+        Assertions.assertEquals(2, process.getNameToTaskMap().size());
     }
 
 }
diff --git a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
index 951d854c9..53ece0c11 100644
--- a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
+++ b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
@@ -606,7 +606,8 @@ CREATE TABLE IF NOT EXISTS `workflow_process`
     `display_name`    varchar(256) NOT NULL COMMENT 'Process display name',
     `type`            varchar(256)          DEFAULT NULL COMMENT 'Process classification',
     `title`           varchar(256)          DEFAULT NULL COMMENT 'Process title',
-    `inlong_group_id` varchar(256)          DEFAULT NULL COMMENT 'Inlong group id: to facilitate related inlong group',
+    `inlong_group_id` varchar(256)          DEFAULT NULL COMMENT 'Inlong group id to which this process belongs',
+    `inlong_stream_id`varchar(256)          DEFAULT NULL COMMENT 'Inlong stream id to which this process belongs',
     `applicant`       varchar(256) NOT NULL COMMENT 'Applicant',
     `status`          varchar(64)  NOT NULL COMMENT 'Status',
     `form_data`       mediumtext COMMENT 'Form information',
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index fbd73961f..6f6a45fab 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -642,7 +642,8 @@ CREATE TABLE IF NOT EXISTS `workflow_process`
     `display_name`    varchar(256) NOT NULL COMMENT 'Process display name',
     `type`            varchar(256)          DEFAULT NULL COMMENT 'Process classification',
     `title`           varchar(256)          DEFAULT NULL COMMENT 'Process title',
-    `inlong_group_id` varchar(256)          DEFAULT NULL COMMENT 'Inlong group id: to facilitate related inlong group',
+    `inlong_group_id` varchar(256)          DEFAULT NULL COMMENT 'Inlong group id to which this process belongs',
+    `inlong_stream_id`varchar(256)          DEFAULT NULL COMMENT 'Inlong stream id to which this process belongs',
     `applicant`       varchar(256) NOT NULL COMMENT 'Applicant',
     `status`          varchar(64)  NOT NULL COMMENT 'Status',
     `form_data`       mediumtext COMMENT 'Form information',
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/StartEventProcessor.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/StartEventProcessor.java
index 9d989875e..d3b26786e 100644
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/StartEventProcessor.java
+++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/StartEventProcessor.java
@@ -25,6 +25,7 @@ import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.WorkflowProcessEntity;
 import org.apache.inlong.manager.dao.mapper.WorkflowProcessEntityMapper;
 import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
+import org.apache.inlong.manager.pojo.workflow.form.process.StreamResourceProcessForm;
 import org.apache.inlong.manager.workflow.WorkflowAction;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.definition.StartEvent;
@@ -91,6 +92,10 @@ public class StartEventProcessor extends AbstractNextableElementProcessor<StartE
         processEntity.setType(process.getType());
         processEntity.setTitle(form.getTitle());
         processEntity.setInlongGroupId(form.getInlongGroupId());
+        if (form instanceof StreamResourceProcessForm) {
+            StreamResourceProcessForm streamForm = (StreamResourceProcessForm) form;
+            processEntity.setInlongStreamId(streamForm.getStreamInfo().getInlongStreamId());
+        }
         processEntity.setApplicant(applicant);
         processEntity.setStatus(ProcessStatus.PROCESSING.name());
         try {