You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/11/10 13:45:36 UTC

[inlong] 03/04: [INLONG-6502][Manager] Optimize some log and code formats (#6503)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.4
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 4e511b880eb57c30a0908fb7abd9e5276a9c2219
Author: healchow <he...@gmail.com>
AuthorDate: Thu Nov 10 19:03:00 2022 +0800

    [INLONG-6502][Manager] Optimize some log and code formats (#6503)
---
 .../manager/common/consts/InlongConstants.java     |   8 ++
 .../inlong/manager/common/enums/ClusterStatus.java |  15 +--
 .../inlong/manager/common/enums/GroupStatus.java   |   8 +-
 .../inlong/manager/dao/RestTemplateConfig.java     |   5 +-
 .../manager/plugin/flink/TaskRunService.java       |  50 ++++----
 .../plugin/listener/DeleteSortListener.java        |  24 ++--
 .../plugin/listener/DeleteStreamListener.java      |  27 ++---
 .../inlong/manager/plugin/util/FlinkUtils.java     |   2 +-
 .../manager/pojo/sink/mysql/MySQLSinkDTO.java      |  13 +-
 .../manager/service/core/SortConfigLoader.java     |  32 +++--
 .../manager/service/core/impl/AbstractService.java | 119 -------------------
 .../service/group/InlongGroupProcessService.java   |  23 ++--
 .../service/heartbeat/HeartbeatManager.java        |   2 +-
 .../listener/queue/QueueResourceListener.java      |  26 ++--
 .../service/listener/sort/SortConfigListener.java  |   6 +-
 .../service/operationlog/OperationLogPool.java     |  29 ++---
 .../queue/kafka/KafkaResourceOperators.java        |  23 ++--
 .../resource/queue/pulsar/PulsarOperator.java      |   2 +-
 .../queue/pulsar/PulsarResourceOperator.java       | 131 ++++++++++-----------
 .../resource/sink/es/ElasticsearchConfig.java      |  16 ++-
 .../resource/sink/mysql/MySQLJdbcUtils.java        |  88 +++++++-------
 .../resource/sink/mysql/MySQLResourceOperator.java |  18 +--
 .../resource/sort/DefaultSortConfigOperator.java   |  13 +-
 .../service/source/AbstractSourceOperator.java     |   4 +-
 .../service/source/SourceSnapshotOperator.java     |  21 ++--
 .../source/pulsar/PulsarSourceOperator.java        |   1 +
 .../service/stream/InlongStreamProcessService.java |  26 ++--
 .../service/stream/InlongStreamServiceImpl.java    |   5 +-
 .../inlong/manager/service/RestTemplateConfig.java |   5 +-
 .../resources/application-unit-test.properties     |   6 +-
 .../manager/web/auth/openapi/OpenAPIFilter.java    |   2 +-
 .../manager/web/auth/web/AuthenticationFilter.java |   2 +-
 .../manager/web/config/RestTemplateConfig.java     |  10 +-
 .../src/main/resources/application-prod.properties |   3 +-
 .../src/main/resources/application-test.properties |   5 +-
 .../event/process/ProcessEventListener.java        |  13 +-
 .../workflow/processor/ServiceTaskProcessor.java   |  18 +--
 .../workflow/processor/StartEventProcessor.java    |  24 ++--
 38 files changed, 365 insertions(+), 460 deletions(-)

diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
index 28c2435d2..5d0b0a13d 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
@@ -22,6 +22,14 @@ package org.apache.inlong.manager.common.consts;
  */
 public class InlongConstants {
 
+    /**
+     * Thread pool related config.
+     */
+    public static final int CORE_POOL_SIZE = 10;
+    public static final int MAX_POOL_SIZE = 20;
+    public static final long ALIVE_TIME_MS = 100L;
+    public static final int QUEUE_SIZE = 10000;
+
     /**
      * Group config
      */
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ClusterStatus.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ClusterStatus.java
index 0cccd1589..e6f3b0d3a 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ClusterStatus.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ClusterStatus.java
@@ -17,8 +17,6 @@
 
 package org.apache.inlong.manager.common.enums;
 
-import org.apache.inlong.manager.common.exceptions.WorkflowException;
-
 /**
  * Status enum of cluster
  */
@@ -28,7 +26,7 @@ public enum ClusterStatus {
 
     INITING(2);
 
-    int status;
+    final int status;
 
     ClusterStatus(int status) {
         this.status = status;
@@ -37,15 +35,4 @@ public enum ClusterStatus {
     public int getStatus() {
         return status;
     }
-
-    public static ClusterStatus fromStatus(int status) {
-        switch (status) {
-            case 1:
-                return NORMAL;
-            case 2:
-                return INITING;
-            default:
-                throw new WorkflowException("unknown status: " + status);
-        }
-    }
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupStatus.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupStatus.java
index dafcfe747..e60b898d5 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupStatus.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupStatus.java
@@ -49,7 +49,7 @@ public enum GroupStatus {
     DELETING(41, "deleting"),
     DELETED(40, "deleted"),
 
-    // GROUP_FINISH is used for batch task.
+    // FINISH is used for batch task.
     FINISH(131, "finish");
 
     private static final Map<GroupStatus, Set<GroupStatus>> GROUP_STATE_AUTOMATON = Maps.newHashMap();
@@ -60,8 +60,7 @@ public enum GroupStatus {
     static {
         GROUP_STATE_AUTOMATON.put(DRAFT, Sets.newHashSet(DRAFT, TO_BE_SUBMIT, DELETING));
         GROUP_STATE_AUTOMATON.put(TO_BE_SUBMIT, Sets.newHashSet(TO_BE_SUBMIT, TO_BE_APPROVAL, DELETING));
-        GROUP_STATE_AUTOMATON.put(TO_BE_APPROVAL,
-                Sets.newHashSet(TO_BE_APPROVAL, APPROVE_REJECTED, APPROVE_PASSED, DELETING));
+        GROUP_STATE_AUTOMATON.put(TO_BE_APPROVAL, Sets.newHashSet(TO_BE_APPROVAL, APPROVE_REJECTED, APPROVE_PASSED));
 
         GROUP_STATE_AUTOMATON.put(APPROVE_REJECTED, Sets.newHashSet(APPROVE_REJECTED, TO_BE_APPROVAL, DELETING));
         GROUP_STATE_AUTOMATON.put(APPROVE_PASSED, Sets.newHashSet(APPROVE_PASSED, CONFIG_ING, DELETING));
@@ -149,6 +148,7 @@ public enum GroupStatus {
 
     @Override
     public String toString() {
-        return name().toLowerCase(Locale.ROOT).replace("group_", "");
+        return name().toLowerCase(Locale.ROOT);
     }
+
 }
diff --git a/inlong-manager/manager-dao/src/test/java/org/apache/inlong/manager/dao/RestTemplateConfig.java b/inlong-manager/manager-dao/src/test/java/org/apache/inlong/manager/dao/RestTemplateConfig.java
index 4a5e7b804..3cbb57840 100644
--- a/inlong-manager/manager-dao/src/test/java/org/apache/inlong/manager/dao/RestTemplateConfig.java
+++ b/inlong-manager/manager-dao/src/test/java/org/apache/inlong/manager/dao/RestTemplateConfig.java
@@ -29,6 +29,7 @@ import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
 import org.apache.http.impl.client.HttpClientBuilder;
 import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
 import org.apache.http.protocol.HttpContext;
+import org.apache.inlong.common.constant.ProtocolType;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
@@ -79,8 +80,8 @@ public class RestTemplateConfig {
     public PoolingHttpClientConnectionManager httpClientConnectionManager() {
         // Support HTTP, HTTPS
         Registry<ConnectionSocketFactory> registry = RegistryBuilder.<ConnectionSocketFactory>create()
-                .register("http", PlainConnectionSocketFactory.getSocketFactory())
-                .register("https", SSLConnectionSocketFactory.getSocketFactory())
+                .register(ProtocolType.HTTP, PlainConnectionSocketFactory.getSocketFactory())
+                .register(ProtocolType.HTTPS, SSLConnectionSocketFactory.getSocketFactory())
                 .build();
         PoolingHttpClientConnectionManager httpClientConnectionManager = new PoolingHttpClientConnectionManager(
                 registry);
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/TaskRunService.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/TaskRunService.java
index 51109e2c7..bbc9c64fd 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/TaskRunService.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/TaskRunService.java
@@ -17,71 +17,61 @@
 
 package org.apache.inlong.manager.plugin.flink;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.inlong.manager.common.consts.InlongConstants.ALIVE_TIME_MS;
+import static org.apache.inlong.manager.common.consts.InlongConstants.CORE_POOL_SIZE;
+import static org.apache.inlong.manager.common.consts.InlongConstants.MAX_POOL_SIZE;
+import static org.apache.inlong.manager.common.consts.InlongConstants.QUEUE_SIZE;
+
 /**
  * Task run service.
  */
 public class TaskRunService {
 
-    private static final ExecutorService executorService;
-
-    private static final int CORE_POOL_SIZE = 16;
-    private static final int MAXIMUM_POOL_SIZE = 32;
-    private static final int QUEUE_SIZE = 10000;
-    private static final long KEEP_ALIVE_TIME = 0L;
-
-    static {
-        executorService = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE,
-                KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
-                new LinkedBlockingQueue<Runnable>(QUEUE_SIZE));
-    }
+    private static final ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor(
+            CORE_POOL_SIZE,
+            MAX_POOL_SIZE,
+            ALIVE_TIME_MS,
+            TimeUnit.MILLISECONDS,
+            new LinkedBlockingQueue<>(QUEUE_SIZE),
+            new ThreadFactoryBuilder().setNameFormat("inlong-plugin-%s").build(),
+            new CallerRunsPolicy());
 
     /**
      * execute
-     *
-     * @param runnable
      */
     public static void execute(Runnable runnable) {
-        executorService.execute(runnable);
+        EXECUTOR_SERVICE.execute(runnable);
     }
 
     /**
      * submit
-     *
-     * @param runnable
-     * @return
      */
     public static Future<?> submit(Runnable runnable) {
-        return executorService.submit(runnable);
+        return EXECUTOR_SERVICE.submit(runnable);
     }
 
     /**
      * submit
-     *
-     * @param runnable
-     * @param defaultValue
-     * @param <T>
-     * @return
      */
     public static <T> Future<T> submit(Runnable runnable, T defaultValue) {
-        return executorService.submit(runnable, defaultValue);
+        return EXECUTOR_SERVICE.submit(runnable, defaultValue);
     }
 
     /**
      * submit
-     *
-     * @param callable
-     * @param <T>
-     * @return
      */
     public static <T> Future<T> submit(Callable<T> callable) {
-        return executorService.submit(callable);
+        return EXECUTOR_SERVICE.submit(callable);
     }
 
 }
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
index 6e65f30dc..7eae0e9ca 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
@@ -42,7 +42,7 @@ import java.util.Map;
 import static org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStackMsg;
 
 /**
- * Listener of delete sort.
+ * Listener of delete Sort task or config.
  */
 @Slf4j
 public class DeleteSortListener implements SortOperateListener {
@@ -57,17 +57,17 @@ public class DeleteSortListener implements SortOperateListener {
         ProcessForm processForm = context.getProcessForm();
         String groupId = processForm.getInlongGroupId();
         if (!(processForm instanceof GroupResourceProcessForm)) {
-            log.info("not add delete group listener, not GroupResourceProcessForm for groupId [{}]", groupId);
+            log.info("not add delete group listener, not GroupResourceProcessForm for groupId={}", groupId);
             return false;
         }
 
         GroupResourceProcessForm groupProcessForm = (GroupResourceProcessForm) processForm;
         if (groupProcessForm.getGroupOperateType() != GroupOperateType.DELETE) {
-            log.info("not add delete group listener, as the operate was not DELETE for groupId [{}]", groupId);
+            log.info("not add delete group listener, as the operate was not DELETE for groupId={}", groupId);
             return false;
         }
 
-        log.info("add delete group listener for groupId [{}]", groupId);
+        log.info("add delete group listener for groupId={}", groupId);
         return true;
     }
 
@@ -76,7 +76,7 @@ public class DeleteSortListener implements SortOperateListener {
         ProcessForm processForm = context.getProcessForm();
         String groupId = processForm.getInlongGroupId();
         if (!(processForm instanceof GroupResourceProcessForm)) {
-            String message = String.format("process form was not GroupResourceProcessForm for groupId [%s]", groupId);
+            String message = String.format("process form was not GroupResourceProcessForm for groupId=%s", groupId);
             log.error(message);
             return ListenerResult.fail(message);
         }
@@ -90,10 +90,8 @@ public class DeleteSortListener implements SortOperateListener {
         extList.forEach(groupExtInfo -> kvConf.put(groupExtInfo.getKeyName(), groupExtInfo.getKeyValue()));
         String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
         if (StringUtils.isEmpty(sortExt)) {
-            String message = String.format("delete sort failed for groupId [%s], as the sort properties is empty",
-                    groupId);
-            log.error(message);
-            return ListenerResult.fail(message);
+            log.warn("no need to delete sort for groupId={}, as the sort properties is empty", groupId);
+            return ListenerResult.success();
         }
 
         Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue(
@@ -102,7 +100,7 @@ public class DeleteSortListener implements SortOperateListener {
         kvConf.putAll(result);
         String jobId = kvConf.get(InlongConstants.SORT_JOB_ID);
         if (StringUtils.isBlank(jobId)) {
-            String message = String.format("sort job id is empty for groupId [%s]", groupId);
+            String message = String.format("sort job id is empty for groupId=%s", groupId);
             return ListenerResult.fail(message);
         }
 
@@ -115,16 +113,16 @@ public class DeleteSortListener implements SortOperateListener {
         FlinkOperation flinkOperation = new FlinkOperation(flinkService);
         try {
             flinkOperation.delete(flinkInfo);
-            log.info("job delete success for [{}]", jobId);
+            log.info("job delete success for jobId={}", jobId);
             return ListenerResult.success();
         } catch (Exception e) {
             flinkInfo.setException(true);
             flinkInfo.setExceptionMsg(getExceptionStackMsg(e));
             flinkOperation.pollJobStatus(flinkInfo);
 
-            String message = String.format("delete sort failed for groupId [%s] ", groupId);
+            String message = String.format("delete sort failed for groupId=%s", groupId);
             log.error(message, e);
-            return ListenerResult.fail(message + e.getMessage());
+            return ListenerResult.fail(message + ": " + e.getMessage());
         }
     }
 
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java
index 7c50d9738..64006410b 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java
@@ -59,19 +59,19 @@ public class DeleteStreamListener implements SortOperateListener {
         ProcessForm processForm = context.getProcessForm();
         String groupId = processForm.getInlongGroupId();
         if (!(processForm instanceof StreamResourceProcessForm)) {
-            log.info("not add delete stream listener, not StreamResourceProcessForm for groupId [{}]", groupId);
+            log.info("not add delete stream listener, not StreamResourceProcessForm for groupId={}", groupId);
             return false;
         }
 
         StreamResourceProcessForm streamProcessForm = (StreamResourceProcessForm) processForm;
         String streamId = streamProcessForm.getStreamInfo().getInlongStreamId();
         if (streamProcessForm.getGroupOperateType() != GroupOperateType.DELETE) {
-            log.info("not add delete stream listener, as the operate was not DELETE for groupId [{}] streamId [{}]",
+            log.info("not add delete stream listener, as the operate was not DELETE for groupId={} streamId={}",
                     groupId, streamId);
             return false;
         }
 
-        log.info("add delete stream listener for groupId [{}] streamId [{}]", groupId, streamId);
+        log.info("add delete stream listener for groupId={} streamId={}", groupId, streamId);
         return true;
     }
 
@@ -81,26 +81,23 @@ public class DeleteStreamListener implements SortOperateListener {
         StreamResourceProcessForm streamResourceProcessForm = (StreamResourceProcessForm) processForm;
         InlongGroupInfo groupInfo = streamResourceProcessForm.getGroupInfo();
         List<InlongGroupExtInfo> groupExtList = groupInfo.getExtList();
-        log.info("inlong group :{} ext info: {}", groupInfo.getInlongGroupId(), groupExtList);
+        log.info("inlong group: {} ext info: {}", groupInfo.getInlongGroupId(), groupExtList);
+
         InlongStreamInfo streamInfo = streamResourceProcessForm.getStreamInfo();
         List<InlongStreamExtInfo> streamExtList = streamInfo.getExtList();
-        log.info("inlong stream :{} ext info: {}", streamInfo.getInlongStreamId(), streamExtList);
+        log.info("inlong stream: {} ext info: {}", streamInfo.getInlongStreamId(), streamExtList);
 
         Map<String, String> kvConf = new HashMap<>();
         groupExtList.forEach(groupExtInfo -> kvConf.put(groupExtInfo.getKeyName(), groupExtInfo.getKeyValue()));
-        streamExtList.forEach(extInfo -> {
-            kvConf.put(extInfo.getKeyName(), extInfo.getKeyValue());
-        });
+        streamExtList.forEach(extInfo -> kvConf.put(extInfo.getKeyName(), extInfo.getKeyValue()));
 
         final String groupId = streamInfo.getInlongGroupId();
         final String streamId = streamInfo.getInlongStreamId();
         String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
         if (StringUtils.isEmpty(sortExt)) {
-            String message = String.format(
-                    "delete sort failed for groupId [%s] and streamId [%s], as the sort properties is empty",
+            log.warn("no need to delete sort for groupId={} streamId={}, as the sort properties is empty",
                     groupId, streamId);
-            log.error(message);
-            return ListenerResult.fail(message);
+            return ListenerResult.success();
         }
 
         Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue(
@@ -109,7 +106,7 @@ public class DeleteStreamListener implements SortOperateListener {
         kvConf.putAll(result);
         String jobId = kvConf.get(InlongConstants.SORT_JOB_ID);
         if (StringUtils.isBlank(jobId)) {
-            String message = String.format("sort job id is empty for groupId [%s] streamId [%s]", groupId, streamId);
+            String message = String.format("sort job id is empty for groupId=%s streamId=%s", groupId, streamId);
             return ListenerResult.fail(message);
         }
 
@@ -122,14 +119,14 @@ public class DeleteStreamListener implements SortOperateListener {
         FlinkOperation flinkOperation = new FlinkOperation(flinkService);
         try {
             flinkOperation.delete(flinkInfo);
-            log.info("job delete success for [{}]", jobId);
+            log.info("job delete success for jobId={}", jobId);
             return ListenerResult.success();
         } catch (Exception e) {
             flinkInfo.setException(true);
             flinkInfo.setExceptionMsg(getExceptionStackMsg(e));
             flinkOperation.pollJobStatus(flinkInfo);
 
-            String message = String.format("delete sort failed for groupId [%s] streamId [%s]", groupId, streamId);
+            String message = String.format("delete sort failed for groupId=%s streamId=%s", groupId, streamId);
             log.error(message, e);
             return ListenerResult.fail(message + e.getMessage());
         }
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java
index 25052df49..c3daefecf 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java
@@ -101,7 +101,7 @@ public class FlinkUtils {
 
         File baseDir = new File(baseDirName);
         if (!baseDir.exists() || !baseDir.isDirectory()) {
-            log.error("baseDirName find fail :{}", baseDirName);
+            log.error("baseDirName find fail: {}", baseDirName);
             return result;
         }
         String tempName;
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/mysql/MySQLSinkDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/mysql/MySQLSinkDTO.java
index 21604f777..6363a8ec1 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/mysql/MySQLSinkDTO.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/mysql/MySQLSinkDTO.java
@@ -145,26 +145,25 @@ public class MySQLSinkDTO {
         }
 
         String connUri = jdbcUrl.substring(pos1 + 1);
-        int pos;
         if (connUri.startsWith("//")) {
-            if ((pos = connUri.indexOf('/', 2)) != -1) {
+            int pos = connUri.indexOf('/', 2);
+            if (pos != -1) {
                 database = connUri.substring(pos + 1);
             }
         } else {
             database = connUri;
         }
 
+        if (Strings.isNullOrEmpty(database)) {
+            throw new IllegalArgumentException("Invalid JDBC URL: " + jdbcUrl);
+        }
+
         if (database.contains("?")) {
             database = database.substring(0, database.indexOf("?"));
         }
-
         if (database.contains(";")) {
             database = database.substring(0, database.indexOf(";"));
         }
-
-        if (Strings.isNullOrEmpty(database)) {
-            throw new IllegalArgumentException("Invalid JDBC url.");
-        }
         return database;
     }
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortConfigLoader.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortConfigLoader.java
index 6b0a02fc5..7f5469aa6 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortConfigLoader.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortConfigLoader.java
@@ -34,64 +34,76 @@ import java.util.List;
  * Loader for sort service to load configs thought Cursor
  */
 public interface SortConfigLoader {
+
     /**
      * Load all clusters by cursor
-     * @return List of clusters, including MQ cluster and DataNode cluster.
+     *
+     * @return list of clusters, including MQ cluster and DataProxy cluster
      */
     List<SortSourceClusterInfo> loadAllClusters();
 
     /**
      * Load stream sinks by cursor
-     * @return List of Stream sinks.
+     *
+     * @return list of stream sinks
      */
     List<SortSourceStreamSinkInfo> loadAllStreamSinks();
 
     /**
      * Load groups by cursor
-     * @return List of group info
+     *
+     * @return list of group info
      */
     List<SortSourceGroupInfo> loadAllGroup();
 
     /**
-     * Load group backup info by cursor
-     * @param keyName Key name
-     * @return List of group backup info
+     * Load backup group info by cursor
+     *
+     * # @param keyName key name
+     *
+     * @return list of backup group info
      */
     List<InlongGroupExtEntity> loadGroupBackupInfo(String keyName);
 
     /**
-     * Load stream backup info by cursor
-     * @param keyName Key name
-     * @return List of stream backup info
+     * Load backup stream info by cursor
+     *
+     * @param keyName key name
+     * @return list of backup stream info
      */
     List<InlongStreamExtEntity> loadStreamBackupInfo(String keyName);
 
     /**
      * Load all inlong stream info by cursor
-     * @return List of stream info
+     *
+     * @return list of stream info
      */
     List<SortSourceStreamInfo> loadAllStreams();
 
     /**
      * Load all inlong stream sink entity by cursor
+     *
      * @return List of stream sink entity
      */
     List<StreamSinkEntity> loadAllStreamSinkEntity();
 
     /**
      * Load all task info
+     *
      * @return List of tasks
      */
     List<SortTaskInfo> loadAllTask();
 
     /**
      * Load all data node entity
+     *
      * @return List of data node
      */
     List<DataNodeEntity> loadAllDataNodeEntity();
 
     /**
      * Load all fields info
+     *
      * @return List of fields info
      */
     List<SortFieldInfo> loadAllFields();
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AbstractService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AbstractService.java
deleted file mode 100644
index 3b819f898..000000000
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AbstractService.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.core.impl;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import org.apache.commons.collections.CollectionUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.InitializingBean;
-import org.springframework.beans.factory.annotation.Value;
-
-public abstract class AbstractService<T> implements AutoCloseable, InitializingBean {
-
-    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractService.class);
-
-
-    @Value("${msg.to.db.batch.size:10}")
-    private int batchSize = 10;
-
-    @Value("${msg.to.db.queue.size:10000}")
-    private int queueSize = 10000;
-
-    @Value("${msg.to.db.core.pool.size:2}")
-    private int corePoolSize = 2;
-
-    @Value("${msg.to.db.max.pool.size:2}")
-    private int maximumPoolSize = 2;
-
-    @Value("${msg.to.db.queue.pool.size:10}")
-    private int syncSendQueueSize = 10;
-
-    private volatile boolean isClose = false;
-
-    private LinkedBlockingQueue<T> dataQueue;
-
-    private ThreadPoolExecutor pool;
-
-    /**
-     * batch insert entities
-     *
-     * @param entryList entryList
-     * @return boolean true/false
-     */
-    abstract boolean batchInsertEntities(List<T> entryList);
-
-    /**
-     * put Data
-     *
-     * @param data data
-     * @return boolean true/false
-     */
-    public boolean putData(T data) {
-        if (dataQueue == null) {
-            return false;
-        }
-        return dataQueue.offer(data);
-    }
-
-    @Override
-    public void close() {
-        isClose = true;
-    }
-
-    @Override
-    public void afterPropertiesSet() {
-        dataQueue = new LinkedBlockingQueue(queueSize);
-        pool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize,
-                60, TimeUnit.SECONDS, new LinkedBlockingDeque<>(syncSendQueueSize),
-                Executors.defaultThreadFactory());
-        for (int i = 0; i < corePoolSize; i++) {
-            pool.execute(new Task());
-        }
-    }
-
-    class Task implements Runnable {
-        @Override
-        public void run() {
-            while (!isClose) {
-                try {
-                    List<T> entryList = new ArrayList<>();
-                    int count = 0;
-                    while (count < batchSize) {
-                        T data = dataQueue.poll(1, TimeUnit.MILLISECONDS);
-                        if (data != null) {
-                            entryList.add(data);
-                        }
-                        count++;
-                    }
-                    if (CollectionUtils.isNotEmpty(entryList)) {
-                        batchInsertEntities(entryList);
-                    }
-                } catch (Exception e) {
-                    LOGGER.error("batchInsertEntities has exception = {}", e);
-                }
-            }
-        }
-    }
-}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupProcessService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupProcessService.java
index 8d950b706..de044f0aa 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupProcessService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupProcessService.java
@@ -48,6 +48,11 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.inlong.manager.common.consts.InlongConstants.ALIVE_TIME_MS;
+import static org.apache.inlong.manager.common.consts.InlongConstants.CORE_POOL_SIZE;
+import static org.apache.inlong.manager.common.consts.InlongConstants.MAX_POOL_SIZE;
+import static org.apache.inlong.manager.common.consts.InlongConstants.QUEUE_SIZE;
+
 /**
  * Operation to the inlong group process
  */
@@ -56,12 +61,12 @@ public class InlongGroupProcessService {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(InlongGroupProcessService.class);
 
-    private final ExecutorService executorService = new ThreadPoolExecutor(
-            20,
-            40,
-            0L,
+    private static final ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor(
+            CORE_POOL_SIZE,
+            MAX_POOL_SIZE,
+            ALIVE_TIME_MS,
             TimeUnit.MILLISECONDS,
-            new LinkedBlockingQueue<>(),
+            new LinkedBlockingQueue<>(QUEUE_SIZE),
             new ThreadFactoryBuilder().setNameFormat("inlong-group-process-%s").build(),
             new CallerRunsPolicy());
 
@@ -107,7 +112,7 @@ public class InlongGroupProcessService {
         groupService.updateStatus(groupId, GroupStatus.SUSPENDING.getCode(), operator);
         InlongGroupInfo groupInfo = groupService.get(groupId);
         GroupResourceProcessForm form = genGroupResourceProcessForm(groupInfo, GroupOperateType.SUSPEND);
-        executorService.execute(() -> workflowService.start(ProcessName.SUSPEND_GROUP_PROCESS, operator, form));
+        EXECUTOR_SERVICE.execute(() -> workflowService.start(ProcessName.SUSPEND_GROUP_PROCESS, operator, form));
 
         LOGGER.info("success to suspend process asynchronously for groupId={} by operator={}", groupId, operator);
         return groupId;
@@ -147,7 +152,7 @@ public class InlongGroupProcessService {
         groupService.updateStatus(groupId, GroupStatus.RESTARTING.getCode(), operator);
         InlongGroupInfo groupInfo = groupService.get(groupId);
         GroupResourceProcessForm form = genGroupResourceProcessForm(groupInfo, GroupOperateType.RESTART);
-        executorService.execute(() -> workflowService.start(ProcessName.RESTART_GROUP_PROCESS, operator, form));
+        EXECUTOR_SERVICE.execute(() -> workflowService.start(ProcessName.RESTART_GROUP_PROCESS, operator, form));
 
         LOGGER.info("success to restart process asynchronously for groupId={} by operator={}", groupId, operator);
         return groupId;
@@ -181,7 +186,7 @@ public class InlongGroupProcessService {
      */
     public String deleteProcessAsync(String groupId, String operator) {
         LOGGER.info("begin to delete process asynchronously for groupId={} by operator={}", groupId, operator);
-        executorService.execute(() -> {
+        EXECUTOR_SERVICE.execute(() -> {
             try {
                 invokeDeleteProcess(groupId, operator);
             } catch (Exception ex) {
@@ -255,7 +260,7 @@ public class InlongGroupProcessService {
             List<WorkflowProcessEntity> entities = workflowQueryService.listProcessEntity(processQuery);
             entities.sort(Comparator.comparingInt(WorkflowProcessEntity::getId));
             WorkflowProcessEntity lastProcess = entities.get(entities.size() - 1);
-            executorService.execute(() -> {
+            EXECUTOR_SERVICE.execute(() -> {
                 workflowService.continueProcess(lastProcess.getId(), operator, "Reset group status");
             });
             return true;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java
index 010afee02..16a96a8f1 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java
@@ -79,7 +79,7 @@ public class HeartbeatManager implements AbstractHeartbeatManager {
                     }
                 }).build();
 
-        // The expire time of cluster info cache must be greater than heartbeat cache
+        // The expiry time of cluster info cache must be greater than heartbeat cache
         // because the eviction handler needs to query cluster info cache
         clusterInfoCache = Caffeine.newBuilder()
                 .expireAfterAccess(expireTime * 2L, TimeUnit.SECONDS)
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java
index 390c11b48..152832ba1 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java
@@ -48,6 +48,10 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.inlong.manager.common.consts.InlongConstants.ALIVE_TIME_MS;
+import static org.apache.inlong.manager.common.consts.InlongConstants.CORE_POOL_SIZE;
+import static org.apache.inlong.manager.common.consts.InlongConstants.MAX_POOL_SIZE;
+import static org.apache.inlong.manager.common.consts.InlongConstants.QUEUE_SIZE;
 import static org.apache.inlong.manager.common.enums.GroupOperateType.INIT;
 import static org.apache.inlong.manager.common.enums.ProcessName.CREATE_STREAM_RESOURCE;
 
@@ -59,21 +63,23 @@ import static org.apache.inlong.manager.common.enums.ProcessName.CREATE_STREAM_R
 @Service
 public class QueueResourceListener implements QueueOperateListener {
 
+    private static final Integer TIMEOUT_SECONDS = 180;
+
     private static final ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor(
-            20,
-            40,
-            10L,
-            TimeUnit.SECONDS,
-            new LinkedBlockingQueue<>(),
-            new ThreadFactoryBuilder().setNameFormat("inlong-stream-process-%s").build(),
+            CORE_POOL_SIZE,
+            MAX_POOL_SIZE,
+            ALIVE_TIME_MS,
+            TimeUnit.MILLISECONDS,
+            new LinkedBlockingQueue<>(QUEUE_SIZE),
+            new ThreadFactoryBuilder().setNameFormat("inlong-mq-process-%s").build(),
             new CallerRunsPolicy());
 
     @Autowired
     private InlongGroupService groupService;
     @Autowired
-    private QueueResourceOperatorFactory queueOperatorFactory;
-    @Autowired
     private WorkflowService workflowService;
+    @Autowired
+    private QueueResourceOperatorFactory queueOperatorFactory;
 
     @Override
     public TaskEvent event() {
@@ -152,7 +158,9 @@ public class QueueResourceListener implements QueueOperateListener {
                         }
                     });
             try {
-                future.get(180, TimeUnit.SECONDS);
+                // wait for the current process complete before starting the next stream,
+                // otherwise, an exception is thrown and the next stream process will not be started.
+                future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
             } catch (Exception e) {
                 String msg = "failed to execute stream process in asynchronously ";
                 log.error(msg, e);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java
index 08a310ddb..39a3b78dc 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java
@@ -77,15 +77,17 @@ public class SortConfigListener implements SortOperateListener {
 
         GroupOperateType operateType = form.getGroupOperateType();
         if (operateType == GroupOperateType.SUSPEND || operateType == GroupOperateType.DELETE) {
-            LOGGER.info("not build sort config for groupId={}, as the group operate type={}", groupId, operateType);
+            LOGGER.info("no need to build sort config for groupId={} as the operate type is {}", groupId, operateType);
             return ListenerResult.success();
         }
+
         InlongGroupInfo groupInfo = form.getGroupInfo();
         List<InlongStreamInfo> streamInfos = form.getStreamInfos();
         if (CollectionUtils.isEmpty(streamInfos)) {
-            LOGGER.warn("do not build sort config for groupId={}, as the stream is empty", groupId);
+            LOGGER.warn("no need to build sort config for groupId={}, as not found any stream", groupId);
             return ListenerResult.success();
         }
+
         int sinkCount = streamInfos.stream()
                 .map(stream -> stream.getSinkList() == null ? 0 : stream.getSinkList().size())
                 .reduce(0, Integer::sum);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/operationlog/OperationLogPool.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/operationlog/OperationLogPool.java
index 85b8e7eb7..cf7bd4828 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/operationlog/OperationLogPool.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/operationlog/OperationLogPool.java
@@ -36,6 +36,9 @@ import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.IntStream;
 
+import static org.apache.inlong.manager.common.consts.InlongConstants.ALIVE_TIME_MS;
+import static org.apache.inlong.manager.common.consts.InlongConstants.QUEUE_SIZE;
+
 /**
  * Operation log thread pool
  */
@@ -43,24 +46,23 @@ import java.util.stream.IntStream;
 @Component
 public class OperationLogPool {
 
-    private static final int BUFFER_SIZE = 500;
-    private static final int MAX_WAIT_TIME_SECOND = 30;
-    private static final int MAX_QUEUE_SIZE = 10000;
+    private static final int TIMEOUT_SECOND = 30;
     private static final int THREAD_NUM = 3;
-    private static final ArrayBlockingQueue<OperationLogEntity> OPERATION_POOL = new ArrayBlockingQueue<>(
-            MAX_QUEUE_SIZE);
+    private static final int BUFFER_SIZE = 500;
+
+    private static final ArrayBlockingQueue<OperationLogEntity> OPERATION_POOL = new ArrayBlockingQueue<>(QUEUE_SIZE);
 
-    private final ExecutorService executorService = new ThreadPoolExecutor(
+    private static final ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor(
             THREAD_NUM,
             THREAD_NUM,
-            0L,
+            ALIVE_TIME_MS,
             TimeUnit.MILLISECONDS,
-            new LinkedBlockingQueue<>(),
-            new ThreadFactoryBuilder().setNameFormat("async-operation-log-%s").build(),
+            new LinkedBlockingQueue<>(QUEUE_SIZE),
+            new ThreadFactoryBuilder().setNameFormat("inlong-operation-log-%s").build(),
             new CallerRunsPolicy());
 
     @Autowired
-    private OperationLogEntityMapper operationLogEntityMapper;
+    private OperationLogEntityMapper operationLogMapper;
 
     public static void publish(OperationLogEntity operation) {
         if (!OPERATION_POOL.offer(operation)) {
@@ -71,7 +73,7 @@ public class OperationLogPool {
     @PostConstruct
     public void init() {
         IntStream.range(0, THREAD_NUM).forEach(
-                i -> executorService.submit(this::saveOperationLog)
+                i -> EXECUTOR_SERVICE.submit(this::saveOperationLog)
         );
     }
 
@@ -79,14 +81,13 @@ public class OperationLogPool {
         List<OperationLogEntity> buffer = new ArrayList<>(BUFFER_SIZE);
         while (true) {
             buffer.clear();
-            int size = 0;
             try {
-                size = Queues.drain(OPERATION_POOL, buffer, BUFFER_SIZE, MAX_WAIT_TIME_SECOND, TimeUnit.SECONDS);
+                int size = Queues.drain(OPERATION_POOL, buffer, BUFFER_SIZE, TIMEOUT_SECOND, TimeUnit.SECONDS);
                 if (buffer.isEmpty()) {
                     continue;
                 }
                 long startTime = System.currentTimeMillis();
-                operationLogEntityMapper.insertBatch(buffer);
+                operationLogMapper.insertBatch(buffer);
                 log.info("receive {} logs and saved cost {} ms", size, System.currentTimeMillis() - startTime);
             } catch (InterruptedException e) {
                 log.error("save operation log interrupted", e);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java
index 2f65aa735..2157acb1d 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java
@@ -51,14 +51,14 @@ public class KafkaResourceOperators implements QueueResourceOperator {
      */
     private static final String KAFKA_CONSUMER_GROUP = "%s_%s_consumer_group";
 
-    @Autowired
-    private InlongClusterService clusterService;
     @Autowired
     private KafkaOperator kafkaOperator;
     @Autowired
     private InlongStreamService streamService;
     @Autowired
     private InlongConsumeService consumeService;
+    @Autowired
+    private InlongClusterService clusterService;
 
     @Override
     public boolean accept(String mqType) {
@@ -67,7 +67,8 @@ public class KafkaResourceOperators implements QueueResourceOperator {
 
     @Override
     public void createQueueForGroup(@NotNull InlongGroupInfo groupInfo, @NotBlank String operator) {
-        log.info("skip to create kafka topic for groupId={}", groupInfo.getInlongGroupId());
+        log.info("skip to create kafka topic for groupId={}, just create in each inlong stream",
+                groupInfo.getInlongGroupId());
     }
 
     @Override
@@ -84,7 +85,7 @@ public class KafkaResourceOperators implements QueueResourceOperator {
                 return;
             }
             for (InlongStreamBriefInfo streamInfo : streamInfoList) {
-                this.deleteKafkaTopic(groupInfo, streamInfo.getInlongStreamId());
+                this.deleteKafkaTopic(groupInfo, streamInfo.getMqResource());
             }
         } catch (Exception e) {
             log.error("failed to delete kafka resource for groupId=" + groupId, e);
@@ -94,8 +95,7 @@ public class KafkaResourceOperators implements QueueResourceOperator {
     }
 
     @Override
-    public void createQueueForStream(InlongGroupInfo groupInfo, InlongStreamInfo streamInfo,
-            String operator) {
+    public void createQueueForStream(InlongGroupInfo groupInfo, InlongStreamInfo streamInfo, String operator) {
         Preconditions.checkNotNull(groupInfo, "inlong group info cannot be null");
         Preconditions.checkNotNull(streamInfo, "inlong stream info cannot be null");
         Preconditions.checkNotNull(operator, "operator cannot be null");
@@ -117,8 +117,7 @@ public class KafkaResourceOperators implements QueueResourceOperator {
     }
 
     @Override
-    public void deleteQueueForStream(InlongGroupInfo groupInfo, InlongStreamInfo streamInfo,
-            String operator) {
+    public void deleteQueueForStream(InlongGroupInfo groupInfo, InlongStreamInfo streamInfo, String operator) {
         Preconditions.checkNotNull(groupInfo, "inlong group info cannot be null");
         Preconditions.checkNotNull(streamInfo, "inlong stream info cannot be null");
 
@@ -141,7 +140,7 @@ public class KafkaResourceOperators implements QueueResourceOperator {
      * Create Kafka Topic and Subscription, and save the consumer group info.
      */
     private void createKafkaTopic(InlongKafkaInfo kafkaInfo, String topicName) throws Exception {
-        // 1. create kafka topic
+        // create Kafka topic
         ClusterInfo clusterInfo = clusterService.getOne(kafkaInfo.getInlongClusterTag(), null, ClusterType.KAFKA);
         kafkaOperator.createTopic(kafkaInfo, (KafkaClusterInfo) clusterInfo, topicName);
 
@@ -153,7 +152,8 @@ public class KafkaResourceOperators implements QueueResourceOperator {
         }
 
         // Kafka consumers do not need to register in advance
-        // 2. insert the consumer group info
+
+        // save the consumer group info for the Kafka topic
         String consumeGroup = String.format(KAFKA_CONSUMER_GROUP, kafkaInfo.getInlongClusterTag(), topicName);
         Integer id = consumeService.saveBySystem(kafkaInfo, topicName, consumeGroup);
         log.info("success to save inlong consume [{}] for consumerGroup={}, groupId={}, topic={}",
@@ -163,9 +163,8 @@ public class KafkaResourceOperators implements QueueResourceOperator {
     /**
      * Delete Kafka Topic and Subscription, and delete the consumer group info.
      */
-    private void deleteKafkaTopic(InlongGroupInfo groupInfo, String streamId) {
+    private void deleteKafkaTopic(InlongGroupInfo groupInfo, String topicName) {
         ClusterInfo clusterInfo = clusterService.getOne(groupInfo.getInlongClusterTag(), null, ClusterType.KAFKA);
-        String topicName = groupInfo.getInlongGroupId() + "_" + streamId;
         kafkaOperator.forceDeleteTopic((KafkaClusterInfo) clusterInfo, topicName);
     }
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
index 3ebeeca37..b82074d74 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
@@ -353,7 +353,7 @@ public class PulsarOperator {
                 LOGGER.error("check if the subscription exists for topic={} error, continue retry", topic, e);
                 if (count == RETRY_TIMES) {
                     LOGGER.error("after {} times retry, still check subscription exception for topic {}", count, topic);
-                    throw new BusinessException("check if the subscription exists error");
+                    throw new BusinessException("check if the subscription exists error: " + e.getMessage());
                 }
             }
         }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java
index 2e379b285..69a38ec6e 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java
@@ -27,6 +27,7 @@ import org.apache.inlong.manager.common.enums.ClusterType;
 import org.apache.inlong.manager.common.enums.GroupStatus;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
 import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
 import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarInfo;
@@ -44,7 +45,6 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
 import java.util.List;
-import java.util.stream.Collectors;
 
 /**
  * Operator for create Pulsar Tenant, Namespace, Topic and Subscription
@@ -80,72 +80,67 @@ public class PulsarResourceOperator implements QueueResourceOperator {
         Preconditions.checkNotNull(operator, "operator cannot be null");
 
         String groupId = groupInfo.getInlongGroupId();
-        log.info("begin to create pulsar resource for groupId={}", groupId);
+        String clusterTag = groupInfo.getInlongClusterTag();
+        log.info("begin to create pulsar resource for groupId={}, clusterTag={}", groupId, clusterTag);
 
         // get pulsar cluster via the inlong cluster tag from the inlong group
-        String clusterTag = groupInfo.getInlongClusterTag();
-        List<PulsarClusterInfo> pulsarClusters =
-                clusterService.listByTagAndType(clusterTag, ClusterType.PULSAR).stream()
-                        .map(clusterInfo -> (PulsarClusterInfo) clusterInfo)
-                        .collect(Collectors.toList());
-        for (PulsarClusterInfo pulsarCluster : pulsarClusters) {
+        List<ClusterInfo> clusterInfos = clusterService.listByTagAndType(clusterTag, ClusterType.PULSAR);
+        for (ClusterInfo clusterInfo : clusterInfos) {
+            PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfo;
             try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarCluster)) {
-                String clusterName = pulsarCluster.getName();
                 // create pulsar tenant and namespace
                 String tenant = pulsarCluster.getTenant();
                 if (StringUtils.isEmpty(tenant)) {
                     tenant = InlongConstants.DEFAULT_PULSAR_TENANT;
                 }
-                String namespace = groupInfo.getMqResource();
-                InlongPulsarInfo pulsarInfo = (InlongPulsarInfo) groupInfo;
+
                 // if the group was not successful, need create tenant and namespace
                 if (!Objects.equal(GroupStatus.CONFIG_SUCCESSFUL.getCode(), groupInfo.getStatus())) {
                     pulsarOperator.createTenant(pulsarAdmin, tenant);
-                    log.info("success to create pulsar tenant for groupId={}, tenant={}, cluster={}",
-                            groupId, tenant, clusterName);
-                    pulsarOperator.createNamespace(pulsarAdmin, pulsarInfo, tenant, namespace);
-                    log.info("success to create pulsar namespace for groupId={}, namespace={}, cluster={}",
-                            groupId, namespace, clusterName);
+                    String namespace = groupInfo.getMqResource();
+                    pulsarOperator.createNamespace(pulsarAdmin, (InlongPulsarInfo) groupInfo, tenant, namespace);
+
+                    log.info("success to create pulsar resource for groupId={}, tenant={}, namespace={}, cluster={}",
+                            groupId, tenant, namespace, pulsarCluster);
                 }
             } catch (Exception e) {
-                String msg = String.format("failed to create pulsar resource for groupId=%s, cluster=%s", groupId,
-                        pulsarCluster.toString());
-                log.error(msg, e);
+                String msg = "failed to create pulsar resource for groupId=" + groupId;
+                log.error(msg + ", cluster=" + pulsarCluster, e);
                 throw new WorkflowListenerException(msg + ": " + e.getMessage());
             }
         }
-        log.info("success to create pulsar resource for groupId={}", groupId);
+
+        log.info("success to create pulsar resource for groupId={}, clusterTag={}", groupId, clusterTag);
     }
 
     @Override
     public void deleteQueueForGroup(InlongGroupInfo groupInfo, String operator) {
         Preconditions.checkNotNull(groupInfo, "inlong group info cannot be null");
-
         String groupId = groupInfo.getInlongGroupId();
-        log.info("begin to delete pulsar resource for groupId={}", groupId);
-
-        List<PulsarClusterInfo> pulsarClusters =
-                clusterService.listByTagAndType(groupInfo.getInlongClusterTag(), ClusterType.PULSAR).stream()
-                        .map(clusterInfo -> (PulsarClusterInfo) clusterInfo)
-                        .collect(Collectors.toList());
-        for (PulsarClusterInfo clusterInfo : pulsarClusters) {
-            String clusterName = clusterInfo.getName();
+        String clusterTag = groupInfo.getInlongClusterTag();
+        log.info("begin to delete pulsar resource for groupId={}, clusterTag={}", groupId, clusterTag);
+
+        List<InlongStreamBriefInfo> streamInfos = streamService.getTopicList(groupId);
+        if (CollectionUtils.isEmpty(streamInfos)) {
+            log.warn("skip to delete pulsar resource as no streams for groupId={}", groupId);
+            return;
+        }
+
+        List<ClusterInfo> clusterInfos = clusterService.listByTagAndType(clusterTag, ClusterType.PULSAR);
+        for (ClusterInfo clusterInfo : clusterInfos) {
+            PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfo;
             try {
-                List<InlongStreamBriefInfo> streamInfoList = streamService.getTopicList(groupId);
-                if (streamInfoList == null || streamInfoList.isEmpty()) {
-                    log.warn("skip to create pulsar topic and subscription as no streams for groupId={}, cluster={}",
-                            groupId, clusterName);
-                    return;
-                }
-                for (InlongStreamBriefInfo streamInfo : streamInfoList) {
-                    this.deletePulsarTopic(groupInfo, clusterInfo, streamInfo.getMqResource());
+                for (InlongStreamBriefInfo streamInfo : streamInfos) {
+                    this.deletePulsarTopic(groupInfo, pulsarCluster, streamInfo.getMqResource());
                 }
             } catch (Exception e) {
-                log.error("failed to delete pulsar resource for groupId={}, cluster={}", groupId, clusterName, e);
-                throw new WorkflowListenerException("failed to delete pulsar resource: " + e.getMessage());
+                String msg = "failed to delete pulsar resource for groupId=" + groupId;
+                log.error(msg + ", cluster=" + pulsarCluster, e);
+                throw new WorkflowListenerException(msg + ": " + e.getMessage());
             }
         }
-        log.info("success to delete pulsar resource for groupId={}", groupId);
+
+        log.info("success to delete pulsar resource for groupId={}, clusterTag={}", groupId, clusterTag);
     }
 
     @Override
@@ -156,22 +151,25 @@ public class PulsarResourceOperator implements QueueResourceOperator {
 
         String groupId = streamInfo.getInlongGroupId();
         String streamId = streamInfo.getInlongStreamId();
-        log.info("begin to create pulsar resource for groupId={}, streamId={}", groupId, streamId);
+        String clusterTag = groupInfo.getInlongClusterTag();
+        log.info("begin to create pulsar resource for groupId={}, streamId={}, clusterTag={}",
+                groupId, streamId, clusterTag);
 
-        List<PulsarClusterInfo> pulsarClusters =
-                clusterService.listByTagAndType(groupInfo.getInlongClusterTag(), ClusterType.PULSAR).stream()
-                        .map(clusterInfo -> (PulsarClusterInfo) clusterInfo)
-                        .collect(Collectors.toList());
-        for (PulsarClusterInfo pulsarCluster : pulsarClusters) {
+        // get pulsar cluster via the inlong cluster tag from the inlong group
+        List<ClusterInfo> clusterInfos = clusterService.listByTagAndType(clusterTag, ClusterType.PULSAR);
+        for (ClusterInfo clusterInfo : clusterInfos) {
+            PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfo;
             try {
                 // create pulsar topic and subscription
-                this.createTopic((InlongPulsarInfo) groupInfo, pulsarCluster, streamInfo.getMqResource());
-                this.createSubscription((InlongPulsarInfo) groupInfo, pulsarCluster,
-                        streamInfo.getMqResource(), streamId);
+                String topicName = streamInfo.getMqResource();
+                this.createTopic((InlongPulsarInfo) groupInfo, pulsarCluster, topicName);
+                this.createSubscription((InlongPulsarInfo) groupInfo, pulsarCluster, topicName, streamId);
+
+                log.info("success to create pulsar resource for groupId={}, streamId={}, topic={}, cluster={}",
+                        groupId, streamId, topicName, pulsarCluster);
             } catch (Exception e) {
-                String msg = String.format("failed to create pulsar topic for groupId=%s, streamId=%s, cluster=%s",
-                        groupId, streamId, pulsarCluster.getName());
-                log.error(msg, e);
+                String msg = "failed to create pulsar resource for groupId=" + groupId + ", streamId=" + streamId;
+                log.error(msg + ", cluster=" + pulsarCluster, e);
                 throw new WorkflowListenerException(msg + ": " + e.getMessage());
             }
         }
@@ -186,25 +184,24 @@ public class PulsarResourceOperator implements QueueResourceOperator {
 
         String groupId = streamInfo.getInlongGroupId();
         String streamId = streamInfo.getInlongStreamId();
-        log.info("begin to delete pulsar resource for groupId={} streamId={}", groupId, streamId);
-
-        List<PulsarClusterInfo> pulsarClusters =
-                clusterService.listByTagAndType(groupInfo.getInlongClusterTag(), ClusterType.PULSAR).stream()
-                        .map(clusterInfo -> (PulsarClusterInfo) clusterInfo)
-                        .collect(Collectors.toList());
-        for (PulsarClusterInfo clusterInfo : pulsarClusters) {
-            String clusterName = clusterInfo.getName();
+        String clusterTag = groupInfo.getInlongClusterTag();
+        log.info("begin to delete pulsar resource for groupId={}, streamId={}, clusterTag={}",
+                groupId, streamId, clusterTag);
+
+        List<ClusterInfo> clusterInfos = clusterService.listByTagAndType(clusterTag, ClusterType.PULSAR);
+        for (ClusterInfo clusterInfo : clusterInfos) {
+            PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfo;
             try {
-                this.deletePulsarTopic(groupInfo, clusterInfo, streamInfo.getMqResource());
-                log.info("success to delete pulsar topic for groupId={}, streamId={}, cluster={}",
-                        groupId, streamId, clusterName);
+                this.deletePulsarTopic(groupInfo, pulsarCluster, streamInfo.getMqResource());
+                log.info("success to delete pulsar topic for groupId={}, streamId={}, topic={}, cluster={}",
+                        groupId, streamId, streamInfo.getMqResource(), pulsarCluster);
             } catch (Exception e) {
-                String msg = String.format("failed to delete pulsar topic for groupId=%s, streamId=%s, cluster=%s",
-                        groupId, streamId, clusterName);
-                log.error(msg, e);
-                throw new WorkflowListenerException(msg);
+                String msg = "failed to delete pulsar topic for groupId=" + groupId + ", streamId=" + streamId;
+                log.error(msg + ", cluster=" + pulsarCluster, e);
+                throw new WorkflowListenerException(msg + ": " + e.getMessage());
             }
         }
+
         log.info("success to delete pulsar resource for groupId={}, streamId={}", groupId, streamId);
     }
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/es/ElasticsearchConfig.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/es/ElasticsearchConfig.java
index 6cccff706..c2df68f05 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/es/ElasticsearchConfig.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/es/ElasticsearchConfig.java
@@ -17,17 +17,14 @@
 
 package org.apache.inlong.manager.service.resource.sink.es;
 
-import java.util.ArrayList;
-import java.util.List;
-
 import lombok.Data;
-
 import org.apache.commons.lang3.StringUtils;
 import org.apache.http.HttpHost;
 import org.apache.http.auth.AuthScope;
 import org.apache.http.auth.UsernamePasswordCredentials;
 import org.apache.http.client.CredentialsProvider;
 import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.inlong.common.constant.ProtocolType;
 import org.elasticsearch.client.RestClient;
 import org.elasticsearch.client.RestClientBuilder;
 import org.elasticsearch.client.RestHighLevelClient;
@@ -36,6 +33,9 @@ import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Component;
 
+import java.util.ArrayList;
+import java.util.List;
+
 /**
  * Elasticsearch config information, including host, port, etc.
  */
@@ -43,6 +43,8 @@ import org.springframework.stereotype.Component;
 @Component
 public class ElasticsearchConfig {
 
+    private static final Logger logger = LoggerFactory.getLogger(ElasticsearchConfig.class);
+    private static RestHighLevelClient highLevelClient;
     @Value("${es.index.search.hostname}")
     private String host;
     @Value("${es.index.search.port}")
@@ -54,10 +56,6 @@ public class ElasticsearchConfig {
     @Value("${es.auth.password}")
     private String password;
 
-    private static final Logger logger = LoggerFactory.getLogger(ElasticsearchConfig.class);
-
-    private static RestHighLevelClient highLevelClient;
-
     /**
      * highLevelClient
      *
@@ -75,7 +73,7 @@ public class ElasticsearchConfig {
                     for (String host : hostArrays) {
                         if (StringUtils.isNotBlank(host)) {
                             host = host.trim();
-                            hosts.add(new HttpHost(host, port, "http"));
+                            hosts.add(new HttpHost(host, port, ProtocolType.HTTP));
                         }
                     }
                     RestClientBuilder clientBuilder = RestClient.builder(hosts.toArray(new HttpHost[0]));
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/mysql/MySQLJdbcUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/mysql/MySQLJdbcUtils.java
index 441779c5c..d463ca9ab 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/mysql/MySQLJdbcUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/mysql/MySQLJdbcUtils.java
@@ -38,10 +38,8 @@ import java.util.Objects;
 public class MySQLJdbcUtils {
 
     private static final String MYSQL_JDBC_PREFIX = "jdbc:mysql";
-
     private static final String MYSQL_DRIVER_CLASS = "com.mysql.cj.jdbc.Driver";
-
-    private static final Logger LOG = LoggerFactory.getLogger(MySQLJdbcUtils.class);
+    private static final Logger LOGGER = LoggerFactory.getLogger(MySQLJdbcUtils.class);
 
     /**
      * Get MySQL connection from the url and user.
@@ -52,46 +50,46 @@ public class MySQLJdbcUtils {
      * @return {@link Connection}
      * @throws Exception on get connection error
      */
-    public static Connection getConnection(final String url, final String user, final String password)
-            throws Exception {
+    public static Connection getConnection(String url, String user, String password) throws Exception {
         if (StringUtils.isBlank(url) || !url.startsWith(MYSQL_JDBC_PREFIX)) {
-            throw new Exception("MySQL server URL was invalid, it should start with jdbc:mysql");
+            throw new Exception("MySQL JDBC URL was invalid, it should start with jdbc:mysql");
         }
+
         Connection conn;
         try {
             Class.forName(MYSQL_DRIVER_CLASS);
             conn = DriverManager.getConnection(url, user, password);
         } catch (Exception e) {
-            final String errorMsg = "get MySQL connection error, please check MySQL JDBC url, username or password!";
-            LOG.error(errorMsg, e);
+            String errorMsg = "get MySQL connection error, please check MySQL JDBC url, username or password!";
+            LOGGER.error(errorMsg, e);
             throw new Exception(errorMsg + " other error msg: " + e.getMessage());
         }
         if (Objects.isNull(conn)) {
             throw new Exception("get MySQL connection failed, please contact administrator.");
         }
-        LOG.info("get MySQL connection success, url={}", url);
+        LOGGER.info("get MySQL connection success for url={}", url);
         return conn;
     }
 
     /**
      * Execute SQL command on MySQL.
      *
-     * @param conn JDBC Connection  {@link Connection}
-     * @param sql SQL string to be executed
+     * @param conn JDBC {@link Connection}
+     * @param sql SQL to be executed
      * @throws Exception on execute SQL error
      */
     public static void executeSql(final Connection conn, final String sql) throws Exception {
         try (Statement stmt = conn.createStatement()) {
             stmt.execute(sql);
-            LOG.info("execute sql [{}] success", sql);
+            LOGGER.info("execute sql [{}] success", sql);
         }
     }
 
     /**
      * Execute batch query SQL on MySQL.
      *
-     * @param conn JDBC Connection  {@link Connection}
-     * @param sqls SQL string to be executed
+     * @param conn JDBC {@link Connection}
+     * @param sqls SQL to be executed
      * @throws Exception on get execute SQL batch error
      */
     public static void executeSqlBatch(final Connection conn, final List<String> sqls) throws Exception {
@@ -101,7 +99,7 @@ public class MySQLJdbcUtils {
                 stmt.execute(entry);
             }
             conn.commit();
-            LOG.info("execute sql [{}] success", sqls);
+            LOGGER.info("execute sql [{}] success", sqls);
         } finally {
             conn.setAutoCommit(true);
         }
@@ -110,7 +108,7 @@ public class MySQLJdbcUtils {
     /**
      * Create MySQL database
      *
-     * @param conn JDBC Connection  {@link Connection}
+     * @param conn JDBC {@link Connection}
      * @param dbName database name
      * @throws Exception on create database error
      */
@@ -118,58 +116,58 @@ public class MySQLJdbcUtils {
         if (!checkDbExist(conn, dbName)) {
             final String createDbSql = MySQLSqlBuilder.buildCreateDbSql(dbName);
             executeSql(conn, createDbSql);
-            LOG.info("execute sql [{}] success", createDbSql);
+            LOGGER.info("execute sql [{}] success", createDbSql);
         } else {
-            LOG.info("The database [{}] are exists", dbName);
+            LOGGER.info("The database [{}] are exists", dbName);
         }
     }
 
     /**
      * Check database from the MySQL information_schema.
      *
-     * @param conn JDBC Connection  {@link Connection}
-     * @param dbName MySQL database name
+     * @param conn JDBC {@link Connection}
+     * @param dbName database name
      * @return true if table exist, otherwise false
      * @throws Exception on check database exist error
      */
     public static boolean checkDbExist(final Connection conn, final String dbName) throws Exception {
-        boolean result = false;
         final String checkDbSql = MySQLSqlBuilder.getCheckDatabase(dbName);
         try (Statement stmt = conn.createStatement();
                 ResultSet resultSet = stmt.executeQuery(checkDbSql)) {
             if (Objects.nonNull(resultSet)) {
                 if (resultSet.next()) {
+                    LOGGER.info("check db exist for db={}, result=true", dbName);
                     return true;
                 }
             }
         }
-        LOG.info("check db exist for db={}, result={}", dbName, result);
-        return result;
+        LOGGER.info("check db exist for db={}, result=false", dbName);
+        return false;
     }
 
     /**
      * Create MySQL table by MySQLTableInfo
      *
-     * @param conn JDBC Connection  {@link Connection}
-     * @param tableInfo MySQL table info  {@link MySQLTableInfo}
+     * @param conn JDBC {@link Connection}
+     * @param tableInfo table info  {@link MySQLTableInfo}
      * @throws Exception on create table error
      */
     public static void createTable(final Connection conn, final MySQLTableInfo tableInfo) throws Exception {
         if (checkTablesExist(conn, tableInfo.getDbName(), tableInfo.getTableName())) {
-            LOG.info("The table [{}] are exists", tableInfo.getTableName());
+            LOGGER.info("The table [{}] are exists", tableInfo.getTableName());
         } else {
             final String createTableSql = MySQLSqlBuilder.buildCreateTableSql(tableInfo);
             executeSql(conn, createTableSql);
-            LOG.info("execute sql [{}] success", createTableSql);
+            LOGGER.info("execute sql [{}] success", createTableSql);
         }
     }
 
     /**
      * Check tables from the MySQL information_schema.
      *
-     * @param conn JDBC Connection  {@link Connection}
-     * @param dbName MySQL database name
-     * @param tableName MySQL table name
+     * @param conn JDBC {@link Connection}
+     * @param dbName database name
+     * @param tableName table name
      * @return true if table exist, otherwise false
      * @throws Exception on check table exist error
      */
@@ -185,23 +183,22 @@ public class MySQLJdbcUtils {
                 }
             }
         }
-        LOG.info("check table exist for db={} table={}, result={}", dbName, tableName, result);
+        LOGGER.info("check table exist for db={} table={}, result={}", dbName, tableName, result);
         return result;
     }
 
     /**
-     * Check whether the column exists in the table.
+     * Check whether the column exists in the MySQL table.
      *
      * @param conn JDBC Connection  {@link Connection}
-     * @param dbName MySQL database name
-     * @param tableName MySQL table name
-     * @param column MySQL table column name
+     * @param dbName database name
+     * @param tableName table name
+     * @param column table column name
      * @return true if column exist in the table, otherwise false
      * @throws Exception on check column exist error
      */
     public static boolean checkColumnExist(final Connection conn, final String dbName, final String tableName,
-            final String column)
-            throws Exception {
+            final String column) throws Exception {
         boolean result = false;
         final String checkTableSql = MySQLSqlBuilder.getCheckColumn(dbName, tableName, column);
         try (Statement stmt = conn.createStatement();
@@ -212,16 +209,16 @@ public class MySQLJdbcUtils {
                 }
             }
         }
-        LOG.info("check column exist for db={} table={}, result={} column={}", dbName, tableName, result, column);
+        LOGGER.info("check column exist for db={} table={}, result={} column={}", dbName, tableName, result, column);
         return result;
     }
 
     /**
-     * Query all columns of the tableName.
+     * Query all MySQL table columns by the given tableName.
      *
-     * @param conn JDBC Connection  {@link Connection}
-     * @param dbName MySQL database name
-     * @param tableName MySQL table name
+     * @param conn JDBC {@link Connection}
+     * @param dbName database name
+     * @param tableName table name
      * @return {@link List}
      * @throws Exception on get columns error
      */
@@ -247,9 +244,9 @@ public class MySQLJdbcUtils {
      * Add columns for MySQL table.
      *
      * @param conn JDBC Connection  {@link Connection}
-     * @param dbName MySQL database name
-     * @param tableName MySQL table name
-     * @param columns MySQL columns to be added
+     * @param dbName database name
+     * @param tableName table name
+     * @param columns columns to be added
      * @throws Exception on add columns error
      */
     public static void addColumns(final Connection conn, final String dbName, final String tableName,
@@ -264,4 +261,5 @@ public class MySQLJdbcUtils {
         final List<String> addColumnSql = MySQLSqlBuilder.buildAddColumnsSql(dbName, tableName, columnInfos);
         executeSqlBatch(conn, addColumnSql);
     }
+
 }
\ No newline at end of file
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/mysql/MySQLResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/mysql/MySQLResourceOperator.java
index d60232ac0..6c859cf2a 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/mysql/MySQLResourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/mysql/MySQLResourceOperator.java
@@ -19,15 +19,15 @@ package org.apache.inlong.manager.service.resource.sink.mysql;
 
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.inlong.manager.common.consts.InlongConstants;
-import org.apache.inlong.manager.common.enums.SinkStatus;
 import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.enums.SinkStatus;
 import org.apache.inlong.manager.common.exceptions.WorkflowException;
+import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
+import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
 import org.apache.inlong.manager.pojo.sink.SinkInfo;
 import org.apache.inlong.manager.pojo.sink.mysql.MySQLColumnInfo;
 import org.apache.inlong.manager.pojo.sink.mysql.MySQLSinkDTO;
 import org.apache.inlong.manager.pojo.sink.mysql.MySQLTableInfo;
-import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
-import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
 import org.apache.inlong.manager.service.resource.sink.SinkResourceOperator;
 import org.apache.inlong.manager.service.sink.StreamSinkService;
 import org.slf4j.Logger;
@@ -90,19 +90,19 @@ public class MySQLResourceOperator implements SinkResourceOperator {
             columnList.add(columnInfo);
         }
 
-        final MySQLSinkDTO mySQLSink = MySQLSinkDTO.getFromJson(sinkInfo.getExtParams());
-        final MySQLTableInfo tableInfo = MySQLSinkDTO.getTableInfo(mySQLSink, columnList);
-
-        try (Connection conn = MySQLJdbcUtils.getConnection(mySQLSink.getJdbcUrl(), mySQLSink.getUsername(),
-                mySQLSink.getPassword())) {
+        MySQLSinkDTO sinkDTO = MySQLSinkDTO.getFromJson(sinkInfo.getExtParams());
+        MySQLTableInfo tableInfo = MySQLSinkDTO.getTableInfo(sinkDTO, columnList);
+        try (Connection conn = MySQLJdbcUtils.getConnection(sinkDTO.getJdbcUrl(), sinkDTO.getUsername(),
+                sinkDTO.getPassword())) {
             // 1. create database if not exists
             MySQLJdbcUtils.createDb(conn, tableInfo.getDbName());
             // 2. table not exists, create it
             MySQLJdbcUtils.createTable(conn, tableInfo);
             // 3. table exists, add columns - skip the exists columns
             MySQLJdbcUtils.addColumns(conn, tableInfo.getDbName(), tableInfo.getTableName(), columnList);
+
             // 4. update the sink status to success
-            final String info = "success to create MySQL resource";
+            String info = "success to create MySQL resource";
             sinkService.updateStatus(sinkInfo.getId(), SinkStatus.CONFIG_SUCCESSFUL.getCode(), info);
             LOG.info(info + " for sinkInfo={}", sinkInfo);
         } catch (Throwable e) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
index 2449b835e..08ec23226 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
@@ -78,16 +78,16 @@ public class DefaultSortConfigOperator implements SortConfigOperator {
     public void buildConfig(InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfos, boolean isStream)
             throws Exception {
         if (isStream) {
-            LOGGER.warn("stream workflow no need to build sort config for disable zk");
+            LOGGER.warn("no need to build sort config for stream process when disable zk");
             return;
         }
         if (groupInfo == null || CollectionUtils.isEmpty(streamInfos)) {
-            LOGGER.warn("group info is null or stream infos is empty, no need to build sort config for disable zk");
+            LOGGER.warn("no need to build sort config as the group is null or streams is empty when disable zk");
             return;
         }
 
-        GroupInfo configInfo = this.getGroupInfo(groupInfo, streamInfos);
-        String dataflow = OBJECT_MAPPER.writeValueAsString(configInfo);
+        GroupInfo sortConfigInfo = this.getGroupInfo(groupInfo, streamInfos);
+        String dataflow = OBJECT_MAPPER.writeValueAsString(sortConfigInfo);
         this.addToGroupExt(groupInfo, dataflow);
 
         if (LOGGER.isDebugEnabled()) {
@@ -127,9 +127,8 @@ public class DefaultSortConfigOperator implements SortConfigOperator {
             if (InlongConstants.STANDARD_MODE.equals(groupInfo.getLightweight())) {
                 if (CollectionUtils.isNotEmpty(transformResponses)) {
                     relations = NodeRelationUtils.createNodeRelations(inlongStream);
-
-                    // in standard mode, replace upstream source node and transform input fields node to mq node
-                    // mq node name, which is inlong stream id
+                    // in standard mode, replace upstream source node and transform input fields node
+                    // to MQ node (which is inlong stream id)
                     String mqNodeName = sources.get(0).getSourceName();
                     Set<String> nodeNameSet = getInputNodeNames(sources, transformResponses);
                     adjustTransformField(transformResponses, nodeNameSet, mqNodeName);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
index 6247822b3..a3e68b8b5 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
@@ -236,7 +236,7 @@ public abstract class AbstractSourceOperator implements StreamSourceOperator {
         }
     }
 
-    private void updateFieldOpt(StreamSourceEntity entity, List<StreamField> fieldInfos) {
+    protected void updateFieldOpt(StreamSourceEntity entity, List<StreamField> fieldInfos) {
         Integer sourceId = entity.getId();
         if (CollectionUtils.isEmpty(fieldInfos)) {
             return;
@@ -270,7 +270,7 @@ public abstract class AbstractSourceOperator implements StreamSourceOperator {
         streamFieldMapper.insertAll(list);
     }
 
-    private void saveFieldOpt(StreamSourceEntity entity, List<StreamField> fieldInfos) {
+    protected void saveFieldOpt(StreamSourceEntity entity, List<StreamField> fieldInfos) {
         LOGGER.info("begin to save source fields={}", fieldInfos);
         if (CollectionUtils.isEmpty(fieldInfos)) {
             return;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/SourceSnapshotOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/SourceSnapshotOperator.java
index ddcdc0dde..e94cf2276 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/SourceSnapshotOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/SourceSnapshotOperator.java
@@ -36,7 +36,6 @@ import org.springframework.stereotype.Service;
 
 import javax.annotation.PostConstruct;
 import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -44,6 +43,11 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.inlong.manager.common.consts.InlongConstants.ALIVE_TIME_MS;
+import static org.apache.inlong.manager.common.consts.InlongConstants.CORE_POOL_SIZE;
+import static org.apache.inlong.manager.common.consts.InlongConstants.MAX_POOL_SIZE;
+import static org.apache.inlong.manager.common.consts.InlongConstants.QUEUE_SIZE;
+
 /**
  * Operate the source snapshot
  */
@@ -51,14 +55,15 @@ import java.util.concurrent.TimeUnit;
 public class SourceSnapshotOperator implements AutoCloseable {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(SourceSnapshotOperator.class);
-    private final ExecutorService executorService = new ThreadPoolExecutor(
-            1,
-            1,
-            10L,
-            TimeUnit.SECONDS,
-            new ArrayBlockingQueue<>(100),
+    private static final ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor(
+            CORE_POOL_SIZE,
+            MAX_POOL_SIZE,
+            ALIVE_TIME_MS,
+            TimeUnit.MILLISECONDS,
+            new LinkedBlockingQueue<>(QUEUE_SIZE),
             new ThreadFactoryBuilder().setNameFormat("stream-source-snapshot-%s").build(),
             new CallerRunsPolicy());
+
     @Autowired
     private StreamSourceEntityMapper sourceMapper;
 
@@ -101,7 +106,7 @@ public class SourceSnapshotOperator implements AutoCloseable {
             snapshotQueue = new LinkedBlockingQueue<>(queueSize);
         }
         SaveSnapshotTaskRunnable taskRunnable = new SaveSnapshotTaskRunnable();
-        this.executorService.execute(taskRunnable);
+        EXECUTOR_SERVICE.execute(taskRunnable);
         LOGGER.info("source snapshot operate thread started successfully");
     }
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
index 12ebe6200..c4af55daa 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
@@ -142,6 +142,7 @@ public class PulsarSourceOperator extends AbstractSourceOperator {
                         && StringUtils.isNotEmpty(sourceInfo.getSerializationType())) {
                     pulsarSource.setSerializationType(sourceInfo.getSerializationType());
                 }
+                // currently, only reuse the primary key from Kafka source
                 if (SourceType.KAFKA.equals(sourceInfo.getSourceType())) {
                     pulsarSource.setPrimaryKey(((KafkaSource) sourceInfo).getPrimaryKey());
                 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamProcessService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamProcessService.java
index eaa05ba41..43fff8cdd 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamProcessService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamProcessService.java
@@ -42,6 +42,11 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.inlong.manager.common.consts.InlongConstants.ALIVE_TIME_MS;
+import static org.apache.inlong.manager.common.consts.InlongConstants.CORE_POOL_SIZE;
+import static org.apache.inlong.manager.common.consts.InlongConstants.MAX_POOL_SIZE;
+import static org.apache.inlong.manager.common.consts.InlongConstants.QUEUE_SIZE;
+
 /**
  * Operation related to inlong stream process
  */
@@ -49,12 +54,12 @@ import java.util.concurrent.TimeUnit;
 @Service
 public class InlongStreamProcessService {
 
-    private final ExecutorService executorService = new ThreadPoolExecutor(
-            20,
-            40,
-            0L,
+    private static final ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor(
+            CORE_POOL_SIZE,
+            MAX_POOL_SIZE,
+            ALIVE_TIME_MS,
             TimeUnit.MILLISECONDS,
-            new LinkedBlockingQueue<>(),
+            new LinkedBlockingQueue<>(QUEUE_SIZE),
             new ThreadFactoryBuilder().setNameFormat("inlong-stream-process-%s").build(),
             new CallerRunsPolicy());
 
@@ -100,7 +105,7 @@ public class InlongStreamProcessService {
             ProcessStatus processStatus = workflowResult.getProcessInfo().getStatus();
             return processStatus == ProcessStatus.COMPLETED;
         } else {
-            executorService.execute(() -> workflowService.start(processName, operator, processForm));
+            EXECUTOR_SERVICE.execute(() -> workflowService.start(processName, operator, processForm));
             return true;
         }
     }
@@ -143,7 +148,7 @@ public class InlongStreamProcessService {
             ProcessStatus processStatus = workflowResult.getProcessInfo().getStatus();
             return processStatus == ProcessStatus.COMPLETED;
         } else {
-            executorService.execute(() -> workflowService.start(processName, operator, processForm));
+            EXECUTOR_SERVICE.execute(() -> workflowService.start(processName, operator, processForm));
             return true;
         }
     }
@@ -158,8 +163,7 @@ public class InlongStreamProcessService {
             throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
         }
         GroupStatus groupStatus = GroupStatus.forCode(groupInfo.getStatus());
-        if (groupStatus != GroupStatus.CONFIG_SUCCESSFUL
-                && groupStatus != GroupStatus.RESTARTED) {
+        if (groupStatus != GroupStatus.CONFIG_SUCCESSFUL && groupStatus != GroupStatus.RESTARTED) {
             throw new BusinessException(
                     String.format("group status=%s not support restart stream for groupId=%s", groupStatus, groupId));
         }
@@ -185,7 +189,7 @@ public class InlongStreamProcessService {
             ProcessStatus processStatus = workflowResult.getProcessInfo().getStatus();
             return processStatus == ProcessStatus.COMPLETED;
         } else {
-            executorService.execute(() -> workflowService.start(processName, operator, processForm));
+            EXECUTOR_SERVICE.execute(() -> workflowService.start(processName, operator, processForm));
             return true;
         }
     }
@@ -235,7 +239,7 @@ public class InlongStreamProcessService {
                 return false;
             }
         } else {
-            executorService.execute(() -> {
+            EXECUTOR_SERVICE.execute(() -> {
                 WorkflowResult workflowResult = workflowService.start(processName, operator, processForm);
                 ProcessStatus processStatus = workflowResult.getProcessInfo().getStatus();
                 if (processStatus == ProcessStatus.COMPLETED) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
index 2afa702ca..633d46fca 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
@@ -504,7 +504,8 @@ public class InlongStreamServiceImpl implements InlongStreamService {
 
     /**
      * Update field information
-     * <p/>First physically delete the existing field information, and then add the field information of this batch
+     * <p/>
+     * First physically delete the existing field information, and then add the field information of this batch
      */
     @Transactional(rollbackFor = Throwable.class)
     void updateField(String groupId, String streamId, List<StreamField> fieldList) {
@@ -525,7 +526,7 @@ public class InlongStreamServiceImpl implements InlongStreamService {
         if (CollectionUtils.isEmpty(infoList)) {
             return;
         }
-        infoList.stream().forEach(streamField -> streamField.setId(null));
+        infoList.forEach(streamField -> streamField.setId(null));
         List<InlongStreamFieldEntity> list = CommonBeanUtils.copyListProperties(infoList,
                 InlongStreamFieldEntity::new);
         for (InlongStreamFieldEntity entity : list) {
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/RestTemplateConfig.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/RestTemplateConfig.java
index 9306364bc..d07626cae 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/RestTemplateConfig.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/RestTemplateConfig.java
@@ -29,6 +29,7 @@ import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
 import org.apache.http.impl.client.HttpClientBuilder;
 import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
 import org.apache.http.protocol.HttpContext;
+import org.apache.inlong.common.constant.ProtocolType;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
@@ -79,8 +80,8 @@ public class RestTemplateConfig {
     public PoolingHttpClientConnectionManager httpClientConnectionManager() {
         // Support HTTP, HTTPS
         Registry<ConnectionSocketFactory> registry = RegistryBuilder.<ConnectionSocketFactory>create()
-                .register("http", PlainConnectionSocketFactory.getSocketFactory())
-                .register("https", SSLConnectionSocketFactory.getSocketFactory())
+                .register(ProtocolType.HTTP, PlainConnectionSocketFactory.getSocketFactory())
+                .register(ProtocolType.HTTPS, SSLConnectionSocketFactory.getSocketFactory())
                 .build();
         PoolingHttpClientConnectionManager httpClientConnectionManager = new PoolingHttpClientConnectionManager(
                 registry);
diff --git a/inlong-manager/manager-test/src/main/resources/application-unit-test.properties b/inlong-manager/manager-test/src/main/resources/application-unit-test.properties
index b02b1d211..862eba470 100644
--- a/inlong-manager/manager-test/src/main/resources/application-unit-test.properties
+++ b/inlong-manager/manager-test/src/main/resources/application-unit-test.properties
@@ -18,7 +18,7 @@
 #
 
 # Log level
-logging.level.root=INFO
+logging.level.root=info
 logging.level.org.apache.inlong.manager=debug
 
 spring.sql.init.platform=h2
@@ -31,7 +31,7 @@ spring.datasource.username=root
 spring.datasource.password=""
 
 # Audit configuration
-# Audit query source that decide what data source to query, currently only supports [MYSQL|ELASTICSEARCH]
+# Audit query source that decide what data source to query, currently only supports [MYSQL|ELASTICSEARCH|CLICKHOUSE]
 audit.query.source=MYSQL
 
 # Elasticsearch config
@@ -47,7 +47,7 @@ es.auth.user=admin
 es.auth.password=inlong
 
 # ClickHouse config
-# ClickHouse url
+# ClickHouse jdbcUrl
 audit.ck.jdbcUrl=jdbc:clickhouse://127.0.0.1:8123/apache_inlong_audit
 # ClickHouse username
 audit.ck.username=default
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/auth/openapi/OpenAPIFilter.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/auth/openapi/OpenAPIFilter.java
index 9eae25d0e..3b0f6d107 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/auth/openapi/OpenAPIFilter.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/auth/openapi/OpenAPIFilter.java
@@ -62,7 +62,7 @@ public class OpenAPIFilter implements Filter {
             SecretToken token = parseBasicAuth(httpServletRequest);
             subject.login(token);
         } catch (Exception ex) {
-            LOGGER.error("login error, msg: {}", ex.getMessage());
+            LOGGER.error("login error: {}", ex.getMessage());
             ((HttpServletResponse) servletResponse).sendError(HttpServletResponse.SC_FORBIDDEN, ex.getMessage());
             return;
         }
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/auth/web/AuthenticationFilter.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/auth/web/AuthenticationFilter.java
index 1e0b8c976..5408daefc 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/auth/web/AuthenticationFilter.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/auth/web/AuthenticationFilter.java
@@ -70,7 +70,7 @@ public class AuthenticationFilter implements Filter {
             UsernamePasswordToken token = getPasswordToken(servletRequest);
             subject.login(token);
         } catch (Exception ex) {
-            LOGGER.error("login error, msg: {}", ex.getMessage());
+            LOGGER.error("login error: {}", ex.getMessage());
             ((HttpServletResponse) servletResponse).sendError(HttpServletResponse.SC_FORBIDDEN, ex.getMessage());
             return;
         }
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/config/RestTemplateConfig.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/config/RestTemplateConfig.java
index 68022eef1..79380d0d8 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/config/RestTemplateConfig.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/config/RestTemplateConfig.java
@@ -17,8 +17,6 @@
 
 package org.apache.inlong.manager.web.config;
 
-import java.nio.charset.StandardCharsets;
-import java.util.List;
 import lombok.Data;
 import org.apache.http.HttpResponse;
 import org.apache.http.client.HttpClient;
@@ -31,6 +29,7 @@ import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
 import org.apache.http.impl.client.HttpClientBuilder;
 import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
 import org.apache.http.protocol.HttpContext;
+import org.apache.inlong.common.constant.ProtocolType;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
 import org.springframework.boot.context.properties.ConfigurationProperties;
@@ -43,6 +42,9 @@ import org.springframework.util.ObjectUtils;
 import org.springframework.web.client.DefaultResponseErrorHandler;
 import org.springframework.web.client.RestTemplate;
 
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
 @Data
 @Configuration
 @ConditionalOnMissingBean(RestTemplate.class)
@@ -84,8 +86,8 @@ public class RestTemplateConfig {
     public PoolingHttpClientConnectionManager httpClientConnectionManager() {
         // Support HTTP, HTTPS
         Registry<ConnectionSocketFactory> registry = RegistryBuilder.<ConnectionSocketFactory>create()
-                .register("http", PlainConnectionSocketFactory.getSocketFactory())
-                .register("https", SSLConnectionSocketFactory.getSocketFactory())
+                .register(ProtocolType.HTTP, PlainConnectionSocketFactory.getSocketFactory())
+                .register(ProtocolType.HTTPS, SSLConnectionSocketFactory.getSocketFactory())
                 .build();
         PoolingHttpClientConnectionManager httpClientConnectionManager = new PoolingHttpClientConnectionManager(
                 registry);
diff --git a/inlong-manager/manager-web/src/main/resources/application-prod.properties b/inlong-manager/manager-web/src/main/resources/application-prod.properties
index b11bfdf1f..dfac2d818 100644
--- a/inlong-manager/manager-web/src/main/resources/application-prod.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-prod.properties
@@ -44,6 +44,7 @@ spring.datasource.druid.testOnReturn=false
 spring.datasource.druid.filters=stat,wall
 # Open the mergeSql function through the connectProperties property, Slow SQL records
 spring.datasource.druid.connectionProperties=druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000
+
 # Exclude ElasticsearchRestClientAutoConfiguration
 spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.elasticsearch.ElasticsearchRestClientAutoConfiguration
 
@@ -64,7 +65,7 @@ es.auth.user=admin
 es.auth.password=inlong
 
 # ClickHouse config
-# ClickHouse url
+# ClickHouse jdbcUrl
 audit.ck.jdbcUrl=jdbc:clickhouse://127.0.0.1:8123/apache_inlong_audit
 # ClickHouse username
 audit.ck.username=default
diff --git a/inlong-manager/manager-web/src/main/resources/application-test.properties b/inlong-manager/manager-web/src/main/resources/application-test.properties
index 203124fc2..53dd907c1 100644
--- a/inlong-manager/manager-web/src/main/resources/application-test.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-test.properties
@@ -32,7 +32,7 @@ spring.datasource.druid.initialSize=20
 spring.datasource.druid.minIdle=20
 spring.datasource.druid.maxActive=300
 # Configure the timeout period to wait for the connection to be acquired
-spring.datasource.druid.maxWait=6000
+spring.datasource.druid.maxWait=600000
 # Configure the minimum survival time of a connection in the pool, in milliseconds
 spring.datasource.druid.minEvictableIdleTimeMillis=3600000
 # Detect when applying for connection. It is recommended to configure it to true, which does not affect performance and ensures safety
@@ -45,6 +45,7 @@ spring.datasource.druid.testOnReturn=false
 spring.datasource.druid.filters=stat,wall
 # Open the mergeSql function through the connectProperties property, Slow SQL records
 spring.datasource.druid.connectionProperties=druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000
+
 # Exclude ElasticsearchRestClientAutoConfiguration
 spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.elasticsearch.ElasticsearchRestClientAutoConfiguration
 
@@ -65,7 +66,7 @@ es.auth.user=admin
 es.auth.password=inlong
 
 # ClickHouse config
-# ClickHouse url
+# ClickHouse jdbcUrl
 audit.ck.jdbcUrl=jdbc:clickhouse://127.0.0.1:8123/apache_inlong_audit
 # ClickHouse username
 audit.ck.username=default
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/process/ProcessEventListener.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/process/ProcessEventListener.java
index 581024506..2bb692925 100644
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/process/ProcessEventListener.java
+++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/process/ProcessEventListener.java
@@ -29,6 +29,11 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.inlong.manager.common.consts.InlongConstants.ALIVE_TIME_MS;
+import static org.apache.inlong.manager.common.consts.InlongConstants.CORE_POOL_SIZE;
+import static org.apache.inlong.manager.common.consts.InlongConstants.MAX_POOL_SIZE;
+import static org.apache.inlong.manager.common.consts.InlongConstants.QUEUE_SIZE;
+
 /**
  * Process event listener
  */
@@ -43,11 +48,11 @@ public interface ProcessEventListener extends EventListener<ProcessEvent> {
      * Async process common thread pool
      */
     ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor(
-            20,
-            40,
-            0L,
+            CORE_POOL_SIZE,
+            MAX_POOL_SIZE,
+            ALIVE_TIME_MS,
             TimeUnit.MILLISECONDS,
-            new LinkedBlockingQueue<>(),
+            new LinkedBlockingQueue<>(QUEUE_SIZE),
             new ThreadFactoryBuilder().setNameFormat("inlong-workflow-%s").build(),
             new CallerRunsPolicy());
 
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/ServiceTaskProcessor.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/ServiceTaskProcessor.java
index ac61cfb5b..d157e9ecb 100644
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/ServiceTaskProcessor.java
+++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/ServiceTaskProcessor.java
@@ -105,21 +105,23 @@ public class ServiceTaskProcessor extends AbstractTaskProcessor<ServiceTask> {
         WorkflowContext.ActionContext actionContext = context.getActionContext();
         if (actionContext == null) {
             resetActionContext(context);
+            actionContext = context.getActionContext();
         }
-        WorkflowTaskEntity workflowTaskEntity = actionContext.getTaskEntity();
-        Preconditions.checkTrue(ALLOW_COMPLETE_STATE.contains(TaskStatus.valueOf(workflowTaskEntity.getStatus())),
-                "task status should allow complete");
+        WorkflowTaskEntity taskEntity = actionContext.getTaskEntity();
+        Preconditions.checkTrue(ALLOW_COMPLETE_STATE.contains(TaskStatus.valueOf(taskEntity.getStatus())),
+                String.format("task status %s not allowed to complete", taskEntity.getStatus()));
+
         try {
             ListenerResult listenerResult = this.taskEventNotifier.notify(TaskEvent.COMPLETE, context);
             if (!listenerResult.isSuccess()) {
-                failedTask(context, workflowTaskEntity);
+                failedTask(context, taskEntity);
             } else {
-                completeTaskEntity(context, workflowTaskEntity, TaskStatus.COMPLETED);
+                completeTaskEntity(context, taskEntity, TaskStatus.COMPLETED);
             }
             return listenerResult.isSuccess();
         } catch (Exception e) {
-            log.error("Complete service task failed", e);
-            failedTask(context, workflowTaskEntity);
+            log.error("failed to complete service task: " + taskEntity, e);
+            failedTask(context, taskEntity);
             return false;
         }
     }
@@ -139,12 +141,14 @@ public class ServiceTaskProcessor extends AbstractTaskProcessor<ServiceTask> {
         taskQuery.setProcessId(processId);
         taskQuery.setName(serviceName);
         List<WorkflowTaskEntity> taskEntities = taskEntityMapper.selectByQuery(taskQuery);
+
         WorkflowTaskEntity taskEntity;
         if (CollectionUtils.isEmpty(taskEntities)) {
             taskEntity = saveTaskEntity(serviceTask, context);
         } else {
             taskEntity = taskEntities.get(0);
         }
+
         ActionContext actionContext = new WorkflowContext.ActionContext()
                 .setTask((WorkflowTask) context.getCurrentElement())
                 .setAction(WorkflowAction.COMPLETE)
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/StartEventProcessor.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/StartEventProcessor.java
index d3b26786e..2b98103a8 100644
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/StartEventProcessor.java
+++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/StartEventProcessor.java
@@ -17,10 +17,10 @@
 
 package org.apache.inlong.manager.workflow.processor;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.common.enums.ProcessEvent;
 import org.apache.inlong.manager.common.enums.ProcessStatus;
-import org.apache.inlong.manager.common.exceptions.JsonException;
+import org.apache.inlong.manager.common.util.JsonUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.WorkflowProcessEntity;
 import org.apache.inlong.manager.dao.mapper.WorkflowProcessEntityMapper;
@@ -40,11 +40,10 @@ import java.util.Date;
 /**
  * Start event handler
  */
+@Slf4j
 @Service
 public class StartEventProcessor extends AbstractNextableElementProcessor<StartEvent> {
 
-    @Autowired
-    private ObjectMapper objectMapper;
     @Autowired
     private ProcessEventNotifier processEventNotifier;
     @Autowired
@@ -61,13 +60,16 @@ public class StartEventProcessor extends AbstractNextableElementProcessor<StartE
         WorkflowProcess process = context.getProcess();
         ProcessForm form = context.getProcessForm();
         if (process.getFormClass() != null) {
-            Preconditions.checkNotNull(form, "form cannot be null");
+            Preconditions.checkNotNull(form, "process form cannot be null");
             Preconditions.checkTrue(form.getClass().isAssignableFrom(process.getFormClass()),
-                    "form type not match, should be class " + process.getFormClass());
+                    String.format("form type %s should match the process form type %s",
+                            form.getClass(), process.getFormClass()));
             form.validate();
         } else {
-            Preconditions.checkNull(form, "no form required");
+            log.warn("not need to provide the form info");
+            context.setProcessForm(null);
         }
+
         WorkflowProcessEntity processEntity = saveProcessEntity(applicant, process, form);
         context.setProcessEntity(processEntity);
         context.setActionContext(new WorkflowContext.ActionContext().setAction(WorkflowAction.START));
@@ -91,18 +93,16 @@ public class StartEventProcessor extends AbstractNextableElementProcessor<StartE
         processEntity.setDisplayName(process.getDisplayName());
         processEntity.setType(process.getType());
         processEntity.setTitle(form.getTitle());
+
         processEntity.setInlongGroupId(form.getInlongGroupId());
         if (form instanceof StreamResourceProcessForm) {
             StreamResourceProcessForm streamForm = (StreamResourceProcessForm) form;
             processEntity.setInlongStreamId(streamForm.getStreamInfo().getInlongStreamId());
         }
+
         processEntity.setApplicant(applicant);
         processEntity.setStatus(ProcessStatus.PROCESSING.name());
-        try {
-            processEntity.setFormData(objectMapper.writeValueAsString(form));
-        } catch (Exception e) {
-            throw new JsonException("write form to json error: ", e);
-        }
+        processEntity.setFormData(JsonUtils.toJsonString(form));
         processEntity.setStartTime(new Date());
         processEntity.setHidden(process.getHidden());