You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/11/10 06:46:48 UTC

[inlong] branch master updated: [INLONG-6044][Manager] Distinguish config processes between the InlongGroup and InlongStream (#6046)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new c21437090 [INLONG-6044][Manager] Distinguish config processes between the InlongGroup and InlongStream (#6046)
c21437090 is described below

commit c2143709048cc2a7a6f5dc50ea92575042654182
Author: fuweng11 <76...@users.noreply.github.com>
AuthorDate: Thu Nov 10 14:46:44 2022 +0800

    [INLONG-6044][Manager] Distinguish config processes between the InlongGroup and InlongStream (#6046)
    
    * Distinguish config processes between the InlongGroup and InlongStream
    
    * Use common thread pool for UpdateGroupCompleteListener
    
    * Modify the source state according to lightweight
    
    * Modify the asynchronous process logic of inlong stream
    
    * Allow configuration without inlong stream
    
    * Determine whether to configure Sort according to isStream param
    
    Co-authored-by: healchow <he...@gmail.com>
---
 .../manager/dao/entity/WorkflowProcessEntity.java  |  1 +
 .../mappers/WorkflowProcessEntityMapper.xml        | 27 +++++----
 .../manager/pojo/workflow/ProcessRequest.java      |  3 +
 .../manager/pojo/workflow/TaskLogRequest.java      |  3 +
 .../form/process/StreamResourceProcessForm.java    | 13 +++++
 .../listener/group/InitGroupCompleteListener.java  | 24 +++-----
 .../service/listener/group/InitGroupListener.java  |  4 --
 .../listener/queue/QueueResourceListener.java      | 67 ++++++++++++++++++++++
 .../queue/StreamQueueResourceListener.java         |  3 -
 .../service/listener/sort/SortConfigListener.java  |  5 ++
 .../stream/InitStreamCompleteListener.java         |  7 ++-
 .../queue/kafka/KafkaResourceOperators.java        | 25 +-------
 .../queue/pulsar/PulsarResourceOperator.java       | 16 +-----
 .../resource/sort/DefaultSortConfigOperator.java   | 10 ++--
 .../service/stream/InlongStreamProcessService.java | 20 +++----
 .../service/workflow/WorkflowServiceImpl.java      |  1 +
 .../group/CreateGroupWorkflowDefinition.java       | 24 +-------
 .../group/DeleteGroupWorkflowDefinition.java       |  8 ++-
 .../stream/CreateStreamWorkflowDefinition.java     |  4 +-
 .../stream/DeleteStreamWorkflowDefinition.java     |  4 +-
 .../group/CreateGroupWorkflowDefinitionTest.java   |  4 +-
 .../main/resources/h2/apache_inlong_manager.sql    |  3 +-
 .../manager-web/sql/apache_inlong_manager.sql      |  3 +-
 .../workflow/processor/StartEventProcessor.java    |  5 ++
 24 files changed, 162 insertions(+), 122 deletions(-)

diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/WorkflowProcessEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/WorkflowProcessEntity.java
index 87e92bf4a..b295e1e40 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/WorkflowProcessEntity.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/WorkflowProcessEntity.java
@@ -34,6 +34,7 @@ public class WorkflowProcessEntity {
     private String title;
 
     private String inlongGroupId;
+    private String inlongStreamId;
     private String applicant;
     private String status;
     private String formData;
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/WorkflowProcessEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/WorkflowProcessEntityMapper.xml
index 1969918b4..8b122ab12 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/WorkflowProcessEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/WorkflowProcessEntityMapper.xml
@@ -27,6 +27,7 @@
         <result column="type" jdbcType="VARCHAR" property="type"/>
         <result column="title" jdbcType="VARCHAR" property="title"/>
         <result column="inlong_group_id" jdbcType="VARCHAR" property="inlongGroupId"/>
+        <result column="inlong_stream_id" jdbcType="VARCHAR" property="inlongStreamId"/>
         <result column="applicant" jdbcType="VARCHAR" property="applicant"/>
         <result column="status" jdbcType="VARCHAR" property="status"/>
         <result column="start_time" jdbcType="TIMESTAMP" property="startTime"/>
@@ -36,22 +37,21 @@
         <result column="ext_params" jdbcType="LONGVARCHAR" property="extParams"/>
     </resultMap>
     <sql id="Base_Column_List">
-        id, name, display_name, type, title, inlong_group_id, applicant,
-        status, start_time, end_time, hidden, form_data, ext_params
+        id, name, display_name, type, title, inlong_group_id, inlong_stream_id,
+        applicant, status, start_time, end_time, hidden, form_data, ext_params
     </sql>
 
     <insert id="insert" useGeneratedKeys="true" keyProperty="id" keyColumn="id"
             parameterType="org.apache.inlong.manager.dao.entity.WorkflowProcessEntity">
-        insert into workflow_process (name, display_name,
-                                      type, title, inlong_group_id,
-                                      applicant, status,
-                                      start_time, end_time,
-                                      form_data, ext_params, hidden)
-        values (#{name,jdbcType=VARCHAR}, #{displayName,jdbcType=VARCHAR},
-                #{type,jdbcType=VARCHAR}, #{title,jdbcType=VARCHAR}, #{inlongGroupId,jdbcType=VARCHAR},
-                #{applicant,jdbcType=VARCHAR}, #{status,jdbcType=VARCHAR},
-                #{startTime,jdbcType=TIMESTAMP}, #{endTime,jdbcType=TIMESTAMP},
-                #{formData,jdbcType=LONGVARCHAR}, #{extParams,jdbcType=LONGVARCHAR}, #{hidden,jdbcType=TINYINT})
+        insert into workflow_process (name, display_name, type,
+                                      title, inlong_group_id, inlong_stream_id,
+                                      applicant, status, start_time,
+                                      end_time, form_data, ext_params, hidden)
+        values (#{name,jdbcType=VARCHAR}, #{displayName,jdbcType=VARCHAR}, #{type,jdbcType=VARCHAR},
+                #{title,jdbcType=VARCHAR}, #{inlongGroupId,jdbcType=VARCHAR}, #{inlongStreamId,jdbcType=VARCHAR},
+                #{applicant,jdbcType=VARCHAR}, #{status,jdbcType=VARCHAR}, #{startTime,jdbcType=TIMESTAMP},
+                #{endTime,jdbcType=TIMESTAMP}, #{formData,jdbcType=LONGVARCHAR}, #{extParams,jdbcType=LONGVARCHAR},
+                #{hidden,jdbcType=TINYINT})
     </insert>
 
     <select id="selectById" parameterType="java.lang.Integer" resultMap="BaseResultMap">
@@ -87,6 +87,9 @@
             <if test="inlongGroupId != null and inlongGroupId !=''">
                 and inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR}
             </if>
+            <if test="inlongStreamId != null and inlongStreamId !=''">
+                and inlong_stream_id = #{inlongStreamId,jdbcType=VARCHAR}
+            </if>
             <if test="applicant != null and applicant !=''">
                 and applicant = #{applicant,jdbcType=VARCHAR}
             </if>
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/ProcessRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/ProcessRequest.java
index a9b5138e8..585de747a 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/ProcessRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/ProcessRequest.java
@@ -63,6 +63,9 @@ public class ProcessRequest extends PageRequest {
     @ApiModelProperty("Inlong group id")
     private String inlongGroupId;
 
+    @ApiModelProperty("Inlong stream id")
+    private String inlongStreamId;
+
     @ApiModelProperty("Start time-lower limit")
     @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
     private Date startTimeBegin;
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/TaskLogRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/TaskLogRequest.java
index 5f5011918..f3cc5a9b2 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/TaskLogRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/TaskLogRequest.java
@@ -36,6 +36,9 @@ public class TaskLogRequest extends PageRequest {
     @ApiModelProperty("Inlong group id")
     private String inlongGroupId;
 
+    @ApiModelProperty("Inlong stream id")
+    private String inlongStreamId;
+
     @ApiModelProperty("Process name list")
     private List<String> processNames;
 
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/StreamResourceProcessForm.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/StreamResourceProcessForm.java
index 4c50e58e4..b175bc09d 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/StreamResourceProcessForm.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/StreamResourceProcessForm.java
@@ -39,6 +39,18 @@ public class StreamResourceProcessForm extends BaseProcessForm {
 
     private GroupOperateType groupOperateType = GroupOperateType.INIT;
 
+    /**
+     * Get stream resource process form info.
+     */
+    public static StreamResourceProcessForm getProcessForm(InlongGroupInfo groupInfo, InlongStreamInfo streamInfo,
+            GroupOperateType operateType) {
+        StreamResourceProcessForm processForm = new StreamResourceProcessForm();
+        processForm.setGroupInfo(groupInfo);
+        processForm.setStreamInfo(streamInfo);
+        processForm.setGroupOperateType(operateType);
+        return processForm;
+    }
+
     @Override
     public void validate() throws FormValidateException {
 
@@ -53,4 +65,5 @@ public class StreamResourceProcessForm extends BaseProcessForm {
     public String getInlongGroupId() {
         return groupInfo.getInlongGroupId();
     }
+
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupCompleteListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupCompleteListener.java
index 944c740c7..ca5a375fb 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupCompleteListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupCompleteListener.java
@@ -18,27 +18,26 @@
 package org.apache.inlong.manager.service.listener.group;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.GroupStatus;
 import org.apache.inlong.manager.common.enums.ProcessEvent;
-import org.apache.inlong.manager.common.enums.SourceStatus;
-import org.apache.inlong.manager.common.enums.StreamStatus;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
 import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
 import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.group.InlongGroupRequest;
 import org.apache.inlong.manager.pojo.group.InlongGroupUtils;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
 import org.apache.inlong.manager.service.group.InlongGroupService;
-import org.apache.inlong.manager.service.source.StreamSourceService;
-import org.apache.inlong.manager.service.stream.InlongStreamService;
+import org.apache.inlong.manager.service.stream.InlongStreamProcessService;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
 import org.apache.inlong.manager.workflow.event.process.ProcessEventListener;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
+import java.util.List;
+
 /**
  * The listener of InlongGroup when created resources successfully.
  */
@@ -49,11 +48,9 @@ public class InitGroupCompleteListener implements ProcessEventListener {
     @Autowired
     private InlongGroupService groupService;
     @Autowired
-    private InlongStreamService streamService;
-    @Autowired
-    private StreamSourceService sourceService;
-    @Autowired
     private InlongGroupEntityMapper groupMapper;
+    @Autowired
+    private InlongStreamProcessService streamProcessService;
 
     @Override
     public ProcessEvent event() {
@@ -84,12 +81,9 @@ public class InitGroupCompleteListener implements ProcessEventListener {
         updateGroupRequest.setVersion(existGroup.getVersion());
         groupService.update(updateGroupRequest, operator);
 
-        // update status of other related configs
-        streamService.updateStatus(groupId, null, StreamStatus.CONFIG_SUCCESSFUL.getCode(), operator);
-        if (InlongConstants.LIGHTWEIGHT_MODE.equals(groupInfo.getLightweight())) {
-            sourceService.updateStatus(groupId, null, SourceStatus.SOURCE_NORMAL.getCode(), operator);
-        } else {
-            sourceService.updateStatus(groupId, null, SourceStatus.TO_BE_ISSUED_ADD.getCode(), operator);
+        List<InlongStreamInfo> streamList = form.getStreamInfos();
+        for (InlongStreamInfo streamInfo : streamList) {
+            streamProcessService.startProcess(groupId, streamInfo.getInlongStreamId(), operator, false);
         }
 
         log.info("success to execute InitGroupCompleteListener for groupId={}", groupId);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupListener.java
index d5c238fa8..fbef1c864 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupListener.java
@@ -18,7 +18,6 @@
 package org.apache.inlong.manager.service.listener.group;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.collections.CollectionUtils;
 import org.apache.inlong.manager.common.enums.GroupStatus;
 import org.apache.inlong.manager.common.enums.ProcessEvent;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
@@ -59,9 +58,6 @@ public class InitGroupListener implements ProcessEventListener {
         if (groupInfo == null) {
             throw new WorkflowListenerException("inlong group info cannot be null for init group process");
         }
-        if (CollectionUtils.isEmpty(form.getStreamInfos())) {
-            throw new WorkflowListenerException("inlong stream info list cannot be null for init group process");
-        }
         groupService.updateStatus(groupId, GroupStatus.CONFIG_ING.getCode(), context.getOperator());
 
         log.info("success to execute InitGroupListener for groupId={}", groupId);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java
index edba69c55..390c11b48 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java
@@ -17,22 +17,40 @@
 
 package org.apache.inlong.manager.service.listener.queue;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.GroupOperateType;
 import org.apache.inlong.manager.common.enums.TaskEvent;
+import org.apache.inlong.manager.common.enums.TaskStatus;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.pojo.workflow.TaskResponse;
+import org.apache.inlong.manager.pojo.workflow.WorkflowResult;
 import org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
+import org.apache.inlong.manager.pojo.workflow.form.process.StreamResourceProcessForm;
 import org.apache.inlong.manager.service.group.InlongGroupService;
 import org.apache.inlong.manager.service.resource.queue.QueueResourceOperator;
 import org.apache.inlong.manager.service.resource.queue.QueueResourceOperatorFactory;
+import org.apache.inlong.manager.service.workflow.WorkflowService;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
 import org.apache.inlong.manager.workflow.event.task.QueueOperateListener;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.inlong.manager.common.enums.GroupOperateType.INIT;
+import static org.apache.inlong.manager.common.enums.ProcessName.CREATE_STREAM_RESOURCE;
+
 /**
  * Create message queue resources,
  * such as Pulsar Topic and Subscription, TubeMQ Topic and ConsumerGroup, etc.
@@ -41,10 +59,21 @@ import org.springframework.stereotype.Service;
 @Service
 public class QueueResourceListener implements QueueOperateListener {
 
+    private static final ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor(
+            20,
+            40,
+            10L,
+            TimeUnit.SECONDS,
+            new LinkedBlockingQueue<>(),
+            new ThreadFactoryBuilder().setNameFormat("inlong-stream-process-%s").build(),
+            new CallerRunsPolicy());
+
     @Autowired
     private InlongGroupService groupService;
     @Autowired
     private QueueResourceOperatorFactory queueOperatorFactory;
+    @Autowired
+    private WorkflowService workflowService;
 
     @Override
     public TaskEvent event() {
@@ -82,7 +111,10 @@ public class QueueResourceListener implements QueueOperateListener {
         String operator = context.getOperator();
         switch (operateType) {
             case INIT:
+                // create queue resource for inlong group
                 queueOperator.createQueueForGroup(groupInfo, operator);
+                // create queue resource for all inlong streams under the inlong group
+                this.createQueueForStreams(groupInfo, groupProcessForm.getStreamInfos(), operator);
                 break;
             case DELETE:
                 queueOperator.deleteQueueForGroup(groupInfo, operator);
@@ -96,4 +128,39 @@ public class QueueResourceListener implements QueueOperateListener {
         return ListenerResult.success("success");
     }
 
+    private void createQueueForStreams(InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfos, String operator) {
+        String groupId = groupInfo.getInlongGroupId();
+        log.info("success to start stream process for groupId={}", groupId);
+
+        for (InlongStreamInfo stream : streamInfos) {
+            StreamResourceProcessForm form = StreamResourceProcessForm.getProcessForm(groupInfo, stream, INIT);
+            String streamId = stream.getInlongStreamId();
+            final String errMsg = "failed to start stream process for groupId=" + groupId + " streamId=" + streamId;
+
+            CompletableFuture<WorkflowResult> future = CompletableFuture
+                    .supplyAsync(() -> workflowService.start(CREATE_STREAM_RESOURCE, operator, form), EXECUTOR_SERVICE)
+                    .whenComplete((result, ex) -> {
+                        if (ex != null) {
+                            log.error(errMsg + ": " + ex.getMessage());
+                            throw new WorkflowListenerException(errMsg, ex);
+                        } else {
+                            List<TaskResponse> tasks = result.getNewTasks();
+                            if (TaskStatus.FAILED == tasks.get(tasks.size() - 1).getStatus()) {
+                                log.error(errMsg);
+                                throw new WorkflowListenerException(errMsg);
+                            }
+                        }
+                    });
+            try {
+                future.get(180, TimeUnit.SECONDS);
+            } catch (Exception e) {
+                String msg = "failed to execute stream process in asynchronously ";
+                log.error(msg, e);
+                throw new WorkflowListenerException(msg + ": " + e.getMessage());
+            }
+        }
+
+        log.info("success to start stream process for groupId={}", groupId);
+    }
+
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/StreamQueueResourceListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/StreamQueueResourceListener.java
index 21403ec21..eb3aaab8c 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/StreamQueueResourceListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/StreamQueueResourceListener.java
@@ -29,7 +29,6 @@ import org.apache.inlong.manager.pojo.workflow.form.process.StreamResourceProces
 import org.apache.inlong.manager.service.group.InlongGroupService;
 import org.apache.inlong.manager.service.resource.queue.QueueResourceOperator;
 import org.apache.inlong.manager.service.resource.queue.QueueResourceOperatorFactory;
-import org.apache.inlong.manager.service.stream.InlongStreamService;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
 import org.apache.inlong.manager.workflow.event.task.QueueOperateListener;
@@ -46,8 +45,6 @@ public class StreamQueueResourceListener implements QueueOperateListener {
     @Autowired
     private InlongGroupService groupService;
     @Autowired
-    private InlongStreamService streamService;
-    @Autowired
     private QueueResourceOperatorFactory queueOperatorFactory;
 
     @Override
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java
index d04a20852..08a310ddb 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.manager.service.listener.sort;
 
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.inlong.manager.common.enums.GroupOperateType;
 import org.apache.inlong.manager.common.enums.TaskEvent;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
@@ -81,6 +82,10 @@ public class SortConfigListener implements SortOperateListener {
         }
         InlongGroupInfo groupInfo = form.getGroupInfo();
         List<InlongStreamInfo> streamInfos = form.getStreamInfos();
+        if (CollectionUtils.isEmpty(streamInfos)) {
+            LOGGER.warn("do not build sort config for groupId={}, as the stream is empty", groupId);
+            return ListenerResult.success();
+        }
         int sinkCount = streamInfos.stream()
                 .map(stream -> stream.getSinkList() == null ? 0 : stream.getSinkList().size())
                 .reduce(0, Integer::sum);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/stream/InitStreamCompleteListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/stream/InitStreamCompleteListener.java
index be6e60fd6..141635c46 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/stream/InitStreamCompleteListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/stream/InitStreamCompleteListener.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.manager.service.listener.stream;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.ProcessEvent;
 import org.apache.inlong.manager.common.enums.SourceStatus;
 import org.apache.inlong.manager.common.enums.StreamStatus;
@@ -63,7 +64,11 @@ public class InitStreamCompleteListener implements ProcessEventListener {
         // Update status of other related configs
         streamService.updateStatus(groupId, streamId, StreamStatus.CONFIG_SUCCESSFUL.getCode(), operator);
         streamService.update(streamInfo.genRequest(), operator);
-        sourceService.updateStatus(groupId, streamId, SourceStatus.TO_BE_ISSUED_ADD.getCode(), operator);
+        if (InlongConstants.LIGHTWEIGHT_MODE.equals(form.getGroupInfo().getLightweight())) {
+            sourceService.updateStatus(groupId, streamId, SourceStatus.SOURCE_NORMAL.getCode(), operator);
+        } else {
+            sourceService.updateStatus(groupId, streamId, SourceStatus.TO_BE_ISSUED_ADD.getCode(), operator);
+        }
 
         return ListenerResult.success();
     }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java
index 4e2b7246a..2f65aa735 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java
@@ -54,10 +54,10 @@ public class KafkaResourceOperators implements QueueResourceOperator {
     @Autowired
     private InlongClusterService clusterService;
     @Autowired
-    private InlongStreamService streamService;
-    @Autowired
     private KafkaOperator kafkaOperator;
     @Autowired
+    private InlongStreamService streamService;
+    @Autowired
     private InlongConsumeService consumeService;
 
     @Override
@@ -67,26 +67,7 @@ public class KafkaResourceOperators implements QueueResourceOperator {
 
     @Override
     public void createQueueForGroup(@NotNull InlongGroupInfo groupInfo, @NotBlank String operator) {
-        String groupId = groupInfo.getInlongGroupId();
-        log.info("begin to create kafka resource for groupId={}", groupId);
-
-        InlongKafkaInfo inlongKafkaInfo = (InlongKafkaInfo) groupInfo;
-        try {
-            // 1. create kafka Topic - each Inlong Stream corresponds to a Kafka Topic
-            List<InlongStreamBriefInfo> streamInfoList = streamService.getTopicList(groupId);
-            if (streamInfoList == null || streamInfoList.isEmpty()) {
-                log.warn("skip to create kafka topic and subscription as no streams for groupId={}", groupId);
-                return;
-            }
-            for (InlongStreamBriefInfo streamInfo : streamInfoList) {
-                this.createKafkaTopic(inlongKafkaInfo, streamInfo.getMqResource());
-            }
-        } catch (Exception e) {
-            String msg = String.format("failed to create kafka resource for groupId=%s", groupId);
-            log.error(msg, e);
-            throw new WorkflowListenerException(msg + ": " + e.getMessage());
-        }
-        log.info("success to create kafka resource for groupId={}, cluster={}", groupId, inlongKafkaInfo);
+        log.info("skip to create kafka topic for groupId={}", groupInfo.getInlongGroupId());
     }
 
     @Override
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java
index 23c13b740..2e379b285 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java
@@ -107,20 +107,6 @@ public class PulsarResourceOperator implements QueueResourceOperator {
                     log.info("success to create pulsar namespace for groupId={}, namespace={}, cluster={}",
                             groupId, namespace, clusterName);
                 }
-
-                // create pulsar topic - each Inlong Stream corresponds to a Pulsar topic
-                List<InlongStreamBriefInfo> streamInfoList = streamService.getTopicList(groupId);
-                if (streamInfoList == null || streamInfoList.isEmpty()) {
-                    log.warn("skip to create pulsar topic and subscription as no streams for groupId={}, cluster={}",
-                            groupId, clusterName);
-                    return;
-                }
-                // create pulsar topic and subscription
-                for (InlongStreamBriefInfo stream : streamInfoList) {
-                    this.createTopic(pulsarInfo, pulsarCluster, stream.getMqResource());
-                    this.createSubscription(pulsarInfo, pulsarCluster, stream.getMqResource(),
-                            stream.getInlongStreamId());
-                }
             } catch (Exception e) {
                 String msg = String.format("failed to create pulsar resource for groupId=%s, cluster=%s", groupId,
                         pulsarCluster.toString());
@@ -184,7 +170,7 @@ public class PulsarResourceOperator implements QueueResourceOperator {
                         streamInfo.getMqResource(), streamId);
             } catch (Exception e) {
                 String msg = String.format("failed to create pulsar topic for groupId=%s, streamId=%s, cluster=%s",
-                        groupId, streamId,pulsarCluster.getName());
+                        groupId, streamId, pulsarCluster.getName());
                 log.error(msg, e);
                 throw new WorkflowListenerException(msg + ": " + e.getMessage());
             }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
index a4c6b5d80..2449b835e 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
@@ -77,6 +77,10 @@ public class DefaultSortConfigOperator implements SortConfigOperator {
     @Override
     public void buildConfig(InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfos, boolean isStream)
             throws Exception {
+        if (isStream) {
+            LOGGER.warn("stream workflow no need to build sort config for disable zk");
+            return;
+        }
         if (groupInfo == null || CollectionUtils.isEmpty(streamInfos)) {
             LOGGER.warn("group info is null or stream infos is empty, no need to build sort config for disable zk");
             return;
@@ -84,11 +88,7 @@ public class DefaultSortConfigOperator implements SortConfigOperator {
 
         GroupInfo configInfo = this.getGroupInfo(groupInfo, streamInfos);
         String dataflow = OBJECT_MAPPER.writeValueAsString(configInfo);
-        if (isStream) {
-            this.addToStreamExt(streamInfos, dataflow);
-        } else {
-            this.addToGroupExt(groupInfo, dataflow);
-        }
+        this.addToGroupExt(groupInfo, dataflow);
 
         if (LOGGER.isDebugEnabled()) {
             LOGGER.debug("success to build sort config, isStream={}, dataflow={}", isStream, dataflow);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamProcessService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamProcessService.java
index 06046eceb..eaa05ba41 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamProcessService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamProcessService.java
@@ -92,7 +92,8 @@ public class InlongStreamProcessService {
             throw new BusinessException(errMsg);
         }
 
-        StreamResourceProcessForm processForm = genStreamProcessForm(groupInfo, streamInfo, GroupOperateType.INIT);
+        StreamResourceProcessForm processForm = StreamResourceProcessForm.getProcessForm(groupInfo,
+                streamInfo, GroupOperateType.INIT);
         ProcessName processName = ProcessName.CREATE_STREAM_RESOURCE;
         if (sync) {
             WorkflowResult workflowResult = workflowService.start(processName, operator, processForm);
@@ -134,7 +135,8 @@ public class InlongStreamProcessService {
                     String.format("stream status=%s not support suspend stream for groupId=%s streamId=%s",
                             status, groupId, streamId));
         }
-        StreamResourceProcessForm processForm = genStreamProcessForm(groupInfo, streamInfo, GroupOperateType.SUSPEND);
+        StreamResourceProcessForm processForm = StreamResourceProcessForm.getProcessForm(groupInfo, streamInfo,
+                GroupOperateType.SUSPEND);
         ProcessName processName = ProcessName.SUSPEND_STREAM_RESOURCE;
         if (sync) {
             WorkflowResult workflowResult = workflowService.start(processName, operator, processForm);
@@ -175,7 +177,8 @@ public class InlongStreamProcessService {
                     String.format("stream status=%s not support restart stream for groupId=%s streamId=%s",
                             status, groupId, streamId));
         }
-        StreamResourceProcessForm processForm = genStreamProcessForm(groupInfo, streamInfo, GroupOperateType.RESTART);
+        StreamResourceProcessForm processForm = StreamResourceProcessForm.getProcessForm(groupInfo, streamInfo,
+                GroupOperateType.RESTART);
         ProcessName processName = ProcessName.RESTART_STREAM_RESOURCE;
         if (sync) {
             WorkflowResult workflowResult = workflowService.start(processName, operator, processForm);
@@ -220,7 +223,8 @@ public class InlongStreamProcessService {
                     String.format("stream status=%s not support delete stream for groupId=%s streamId=%s",
                             status, groupId, streamId));
         }
-        StreamResourceProcessForm processForm = genStreamProcessForm(groupInfo, streamInfo, GroupOperateType.DELETE);
+        StreamResourceProcessForm processForm = StreamResourceProcessForm.getProcessForm(groupInfo, streamInfo,
+                GroupOperateType.DELETE);
         ProcessName processName = ProcessName.DELETE_STREAM_RESOURCE;
         if (sync) {
             WorkflowResult workflowResult = workflowService.start(processName, operator, processForm);
@@ -242,12 +246,4 @@ public class InlongStreamProcessService {
         }
     }
 
-    private StreamResourceProcessForm genStreamProcessForm(InlongGroupInfo groupInfo, InlongStreamInfo streamInfo,
-            GroupOperateType operateType) {
-        StreamResourceProcessForm processForm = new StreamResourceProcessForm();
-        processForm.setGroupInfo(groupInfo);
-        processForm.setStreamInfo(streamInfo);
-        processForm.setGroupOperateType(operateType);
-        return processForm;
-    }
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImpl.java
index d3d6bac9a..b59842f41 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImpl.java
@@ -189,6 +189,7 @@ public class WorkflowServiceImpl implements WorkflowService {
 
         ProcessRequest processRequest = new ProcessRequest();
         processRequest.setInlongGroupId(groupId);
+        processRequest.setInlongStreamId(query.getInlongStreamId());
         processRequest.setNameList(processNameList);
         processRequest.setHidden(1);
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/CreateGroupWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/CreateGroupWorkflowDefinition.java
index 01d500bf0..94eb0c3fb 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/CreateGroupWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/CreateGroupWorkflowDefinition.java
@@ -77,14 +77,6 @@ public class CreateGroupWorkflowDefinition implements WorkflowDefinition {
         initMQTask.setListenerFactory(groupTaskListenerFactory);
         process.addTask(initMQTask);
 
-        // Init Sink
-        ServiceTask initSinkTask = new ServiceTask();
-        initSinkTask.setName("InitSink");
-        initSinkTask.setDisplayName("Group-InitSink");
-        initSinkTask.setServiceTaskType(ServiceTaskType.INIT_SINK);
-        initSinkTask.setListenerFactory(groupTaskListenerFactory);
-        process.addTask(initSinkTask);
-
         // Init Sort
         ServiceTask initSortTask = new ServiceTask();
         initSortTask.setName("InitSort");
@@ -93,25 +85,15 @@ public class CreateGroupWorkflowDefinition implements WorkflowDefinition {
         initSortTask.setListenerFactory(groupTaskListenerFactory);
         process.addTask(initSortTask);
 
-        // Init Source
-        ServiceTask initSourceTask = new ServiceTask();
-        initSourceTask.setName("InitSource");
-        initSourceTask.setDisplayName("Group-InitSource");
-        initSourceTask.setServiceTaskType(ServiceTaskType.INIT_SOURCE);
-        initSourceTask.setListenerFactory(groupTaskListenerFactory);
-        process.addTask(initSourceTask);
-
         // End node
         EndEvent endEvent = new EndEvent();
         process.setEndEvent(endEvent);
 
-        // Task dependency order: 1.MQ -> 2.Sink -> 3.Sort -> 4.Source
+        // Task dependency order: 1.MQ -> 2.Sink-in-Stream -> 3.Sort -> 4.Source-in-Stream
         // To ensure that after some tasks fail, data will not start to be collected by source or consumed by sort
         startEvent.addNext(initMQTask);
-        initMQTask.addNext(initSinkTask);
-        initSinkTask.addNext(initSortTask);
-        initSortTask.addNext(initSourceTask);
-        initSourceTask.addNext(endEvent);
+        initMQTask.addNext(initSortTask);
+        initSortTask.addNext(endEvent);
 
         return process;
     }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/DeleteGroupWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/DeleteGroupWorkflowDefinition.java
index 395a3dec9..5d3eb090e 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/DeleteGroupWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/DeleteGroupWorkflowDefinition.java
@@ -18,13 +18,13 @@
 package org.apache.inlong.manager.service.workflow.group;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
 import org.apache.inlong.manager.common.enums.ProcessName;
-import org.apache.inlong.manager.service.workflow.WorkflowDefinition;
+import org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
+import org.apache.inlong.manager.service.listener.GroupTaskListenerFactory;
 import org.apache.inlong.manager.service.listener.group.UpdateGroupCompleteListener;
 import org.apache.inlong.manager.service.listener.group.UpdateGroupFailedListener;
 import org.apache.inlong.manager.service.listener.group.UpdateGroupListener;
-import org.apache.inlong.manager.service.listener.GroupTaskListenerFactory;
+import org.apache.inlong.manager.service.workflow.WorkflowDefinition;
 import org.apache.inlong.manager.workflow.definition.EndEvent;
 import org.apache.inlong.manager.workflow.definition.ServiceTask;
 import org.apache.inlong.manager.workflow.definition.ServiceTaskType;
@@ -93,6 +93,8 @@ public class DeleteGroupWorkflowDefinition implements WorkflowDefinition {
         deleteSortTask.setListenerFactory(groupTaskListenerFactory);
         process.addTask(deleteSortTask);
 
+        // No need to delete the sink because we should not affect the existing data in the sink
+
         // End node
         EndEvent endEvent = new EndEvent();
         process.setEndEvent(endEvent);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java
index 79f154785..6ce7822b5 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java
@@ -18,13 +18,13 @@
 package org.apache.inlong.manager.service.workflow.stream;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.pojo.workflow.form.process.StreamResourceProcessForm;
 import org.apache.inlong.manager.common.enums.ProcessName;
-import org.apache.inlong.manager.service.workflow.WorkflowDefinition;
+import org.apache.inlong.manager.pojo.workflow.form.process.StreamResourceProcessForm;
 import org.apache.inlong.manager.service.listener.StreamTaskListenerFactory;
 import org.apache.inlong.manager.service.listener.stream.InitStreamCompleteListener;
 import org.apache.inlong.manager.service.listener.stream.InitStreamFailedListener;
 import org.apache.inlong.manager.service.listener.stream.InitStreamListener;
+import org.apache.inlong.manager.service.workflow.WorkflowDefinition;
 import org.apache.inlong.manager.workflow.definition.EndEvent;
 import org.apache.inlong.manager.workflow.definition.ServiceTask;
 import org.apache.inlong.manager.workflow.definition.ServiceTaskType;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/DeleteStreamWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/DeleteStreamWorkflowDefinition.java
index 7dd11ca4f..415d78f3f 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/DeleteStreamWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/DeleteStreamWorkflowDefinition.java
@@ -18,13 +18,13 @@
 package org.apache.inlong.manager.service.workflow.stream;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.pojo.workflow.form.process.StreamResourceProcessForm;
 import org.apache.inlong.manager.common.enums.ProcessName;
-import org.apache.inlong.manager.service.workflow.WorkflowDefinition;
+import org.apache.inlong.manager.pojo.workflow.form.process.StreamResourceProcessForm;
 import org.apache.inlong.manager.service.listener.StreamTaskListenerFactory;
 import org.apache.inlong.manager.service.listener.stream.UpdateStreamCompleteListener;
 import org.apache.inlong.manager.service.listener.stream.UpdateStreamFailedListener;
 import org.apache.inlong.manager.service.listener.stream.UpdateStreamListener;
+import org.apache.inlong.manager.service.workflow.WorkflowDefinition;
 import org.apache.inlong.manager.workflow.definition.EndEvent;
 import org.apache.inlong.manager.workflow.definition.ServiceTask;
 import org.apache.inlong.manager.workflow.definition.ServiceTaskType;
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/group/CreateGroupWorkflowDefinitionTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/group/CreateGroupWorkflowDefinitionTest.java
index 31c74a2cb..217d23841 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/group/CreateGroupWorkflowDefinitionTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/group/CreateGroupWorkflowDefinitionTest.java
@@ -37,11 +37,9 @@ public class CreateGroupWorkflowDefinitionTest extends ServiceBaseTest {
         WorkflowProcess cloneProcess1 = process.clone();
         WorkflowProcess cloneProcess2 = cloneProcess1.clone();
         Assertions.assertNotSame(cloneProcess2, cloneProcess1);
-        Assertions.assertNotNull(process.getTaskByName("InitSource"));
         Assertions.assertNotNull(process.getTaskByName("InitMQ"));
         Assertions.assertNotNull(process.getTaskByName("InitSort"));
-        Assertions.assertNotNull(process.getTaskByName("InitSink"));
-        Assertions.assertEquals(4, process.getNameToTaskMap().size());
+        Assertions.assertEquals(2, process.getNameToTaskMap().size());
     }
 
 }
diff --git a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
index 951d854c9..53ece0c11 100644
--- a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
+++ b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
@@ -606,7 +606,8 @@ CREATE TABLE IF NOT EXISTS `workflow_process`
     `display_name`    varchar(256) NOT NULL COMMENT 'Process display name',
     `type`            varchar(256)          DEFAULT NULL COMMENT 'Process classification',
     `title`           varchar(256)          DEFAULT NULL COMMENT 'Process title',
-    `inlong_group_id` varchar(256)          DEFAULT NULL COMMENT 'Inlong group id: to facilitate related inlong group',
+    `inlong_group_id` varchar(256)          DEFAULT NULL COMMENT 'Inlong group id to which this process belongs',
+    `inlong_stream_id`varchar(256)          DEFAULT NULL COMMENT 'Inlong stream id to which this process belongs',
     `applicant`       varchar(256) NOT NULL COMMENT 'Applicant',
     `status`          varchar(64)  NOT NULL COMMENT 'Status',
     `form_data`       mediumtext COMMENT 'Form information',
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index fbd73961f..6f6a45fab 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -642,7 +642,8 @@ CREATE TABLE IF NOT EXISTS `workflow_process`
     `display_name`    varchar(256) NOT NULL COMMENT 'Process display name',
     `type`            varchar(256)          DEFAULT NULL COMMENT 'Process classification',
     `title`           varchar(256)          DEFAULT NULL COMMENT 'Process title',
-    `inlong_group_id` varchar(256)          DEFAULT NULL COMMENT 'Inlong group id: to facilitate related inlong group',
+    `inlong_group_id` varchar(256)          DEFAULT NULL COMMENT 'Inlong group id to which this process belongs',
+    `inlong_stream_id`varchar(256)          DEFAULT NULL COMMENT 'Inlong stream id to which this process belongs',
     `applicant`       varchar(256) NOT NULL COMMENT 'Applicant',
     `status`          varchar(64)  NOT NULL COMMENT 'Status',
     `form_data`       mediumtext COMMENT 'Form information',
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/StartEventProcessor.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/StartEventProcessor.java
index 9d989875e..d3b26786e 100644
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/StartEventProcessor.java
+++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/StartEventProcessor.java
@@ -25,6 +25,7 @@ import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.WorkflowProcessEntity;
 import org.apache.inlong.manager.dao.mapper.WorkflowProcessEntityMapper;
 import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
+import org.apache.inlong.manager.pojo.workflow.form.process.StreamResourceProcessForm;
 import org.apache.inlong.manager.workflow.WorkflowAction;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.definition.StartEvent;
@@ -91,6 +92,10 @@ public class StartEventProcessor extends AbstractNextableElementProcessor<StartE
         processEntity.setType(process.getType());
         processEntity.setTitle(form.getTitle());
         processEntity.setInlongGroupId(form.getInlongGroupId());
+        if (form instanceof StreamResourceProcessForm) {
+            StreamResourceProcessForm streamForm = (StreamResourceProcessForm) form;
+            processEntity.setInlongStreamId(streamForm.getStreamInfo().getInlongStreamId());
+        }
         processEntity.setApplicant(applicant);
         processEntity.setStatus(ProcessStatus.PROCESSING.name());
         try {