You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/11/10 13:45:34 UTC
[inlong] 01/04: [INLONG-6044][Manager] Distinguish config processes between the InlongGroup and InlongStream (#6046)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch branch-1.4
in repository https://gitbox.apache.org/repos/asf/inlong.git
commit b14dbfa7b74e82275cd9178dcb5973aa01469550
Author: fuweng11 <76...@users.noreply.github.com>
AuthorDate: Thu Nov 10 14:46:44 2022 +0800
[INLONG-6044][Manager] Distinguish config processes between the InlongGroup and InlongStream (#6046)
* Distinguish config processes between the InlongGroup and InlongStream
* Use common thread pool for UpdateGroupCompleteListener
* Modify the source state according to lightweight
* Modify the asynchronous process logic of inlong stream
* Allow configuration without inlong stream
* Determine whether to configure Sort according to isStream param
Co-authored-by: healchow <he...@gmail.com>
---
.../manager/dao/entity/WorkflowProcessEntity.java | 1 +
.../mappers/WorkflowProcessEntityMapper.xml | 27 +++++----
.../manager/pojo/workflow/ProcessRequest.java | 3 +
.../manager/pojo/workflow/TaskLogRequest.java | 3 +
.../form/process/StreamResourceProcessForm.java | 13 +++++
.../listener/group/InitGroupCompleteListener.java | 24 +++-----
.../service/listener/group/InitGroupListener.java | 4 --
.../listener/queue/QueueResourceListener.java | 67 ++++++++++++++++++++++
.../queue/StreamQueueResourceListener.java | 3 -
.../service/listener/sort/SortConfigListener.java | 5 ++
.../stream/InitStreamCompleteListener.java | 7 ++-
.../queue/kafka/KafkaResourceOperators.java | 25 +-------
.../queue/pulsar/PulsarResourceOperator.java | 16 +-----
.../resource/sort/DefaultSortConfigOperator.java | 10 ++--
.../service/stream/InlongStreamProcessService.java | 20 +++----
.../service/workflow/WorkflowServiceImpl.java | 1 +
.../group/CreateGroupWorkflowDefinition.java | 24 +-------
.../group/DeleteGroupWorkflowDefinition.java | 8 ++-
.../stream/CreateStreamWorkflowDefinition.java | 4 +-
.../stream/DeleteStreamWorkflowDefinition.java | 4 +-
.../group/CreateGroupWorkflowDefinitionTest.java | 4 +-
.../main/resources/h2/apache_inlong_manager.sql | 3 +-
.../manager-web/sql/apache_inlong_manager.sql | 3 +-
.../workflow/processor/StartEventProcessor.java | 5 ++
24 files changed, 162 insertions(+), 122 deletions(-)
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/WorkflowProcessEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/WorkflowProcessEntity.java
index 87e92bf4a..b295e1e40 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/WorkflowProcessEntity.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/WorkflowProcessEntity.java
@@ -34,6 +34,7 @@ public class WorkflowProcessEntity {
private String title;
private String inlongGroupId;
+ private String inlongStreamId;
private String applicant;
private String status;
private String formData;
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/WorkflowProcessEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/WorkflowProcessEntityMapper.xml
index 1969918b4..8b122ab12 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/WorkflowProcessEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/WorkflowProcessEntityMapper.xml
@@ -27,6 +27,7 @@
<result column="type" jdbcType="VARCHAR" property="type"/>
<result column="title" jdbcType="VARCHAR" property="title"/>
<result column="inlong_group_id" jdbcType="VARCHAR" property="inlongGroupId"/>
+ <result column="inlong_stream_id" jdbcType="VARCHAR" property="inlongStreamId"/>
<result column="applicant" jdbcType="VARCHAR" property="applicant"/>
<result column="status" jdbcType="VARCHAR" property="status"/>
<result column="start_time" jdbcType="TIMESTAMP" property="startTime"/>
@@ -36,22 +37,21 @@
<result column="ext_params" jdbcType="LONGVARCHAR" property="extParams"/>
</resultMap>
<sql id="Base_Column_List">
- id, name, display_name, type, title, inlong_group_id, applicant,
- status, start_time, end_time, hidden, form_data, ext_params
+ id, name, display_name, type, title, inlong_group_id, inlong_stream_id,
+ applicant, status, start_time, end_time, hidden, form_data, ext_params
</sql>
<insert id="insert" useGeneratedKeys="true" keyProperty="id" keyColumn="id"
parameterType="org.apache.inlong.manager.dao.entity.WorkflowProcessEntity">
- insert into workflow_process (name, display_name,
- type, title, inlong_group_id,
- applicant, status,
- start_time, end_time,
- form_data, ext_params, hidden)
- values (#{name,jdbcType=VARCHAR}, #{displayName,jdbcType=VARCHAR},
- #{type,jdbcType=VARCHAR}, #{title,jdbcType=VARCHAR}, #{inlongGroupId,jdbcType=VARCHAR},
- #{applicant,jdbcType=VARCHAR}, #{status,jdbcType=VARCHAR},
- #{startTime,jdbcType=TIMESTAMP}, #{endTime,jdbcType=TIMESTAMP},
- #{formData,jdbcType=LONGVARCHAR}, #{extParams,jdbcType=LONGVARCHAR}, #{hidden,jdbcType=TINYINT})
+ insert into workflow_process (name, display_name, type,
+ title, inlong_group_id, inlong_stream_id,
+ applicant, status, start_time,
+ end_time, form_data, ext_params, hidden)
+ values (#{name,jdbcType=VARCHAR}, #{displayName,jdbcType=VARCHAR}, #{type,jdbcType=VARCHAR},
+ #{title,jdbcType=VARCHAR}, #{inlongGroupId,jdbcType=VARCHAR}, #{inlongStreamId,jdbcType=VARCHAR},
+ #{applicant,jdbcType=VARCHAR}, #{status,jdbcType=VARCHAR}, #{startTime,jdbcType=TIMESTAMP},
+ #{endTime,jdbcType=TIMESTAMP}, #{formData,jdbcType=LONGVARCHAR}, #{extParams,jdbcType=LONGVARCHAR},
+ #{hidden,jdbcType=TINYINT})
</insert>
<select id="selectById" parameterType="java.lang.Integer" resultMap="BaseResultMap">
@@ -87,6 +87,9 @@
<if test="inlongGroupId != null and inlongGroupId !=''">
and inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR}
</if>
+ <if test="inlongStreamId != null and inlongStreamId !=''">
+ and inlong_stream_id = #{inlongStreamId,jdbcType=VARCHAR}
+ </if>
<if test="applicant != null and applicant !=''">
and applicant = #{applicant,jdbcType=VARCHAR}
</if>
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/ProcessRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/ProcessRequest.java
index a9b5138e8..585de747a 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/ProcessRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/ProcessRequest.java
@@ -63,6 +63,9 @@ public class ProcessRequest extends PageRequest {
@ApiModelProperty("Inlong group id")
private String inlongGroupId;
+ @ApiModelProperty("Inlong stream id")
+ private String inlongStreamId;
+
@ApiModelProperty("Start time-lower limit")
@DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private Date startTimeBegin;
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/TaskLogRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/TaskLogRequest.java
index 5f5011918..f3cc5a9b2 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/TaskLogRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/TaskLogRequest.java
@@ -36,6 +36,9 @@ public class TaskLogRequest extends PageRequest {
@ApiModelProperty("Inlong group id")
private String inlongGroupId;
+ @ApiModelProperty("Inlong stream id")
+ private String inlongStreamId;
+
@ApiModelProperty("Process name list")
private List<String> processNames;
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/StreamResourceProcessForm.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/StreamResourceProcessForm.java
index 4c50e58e4..b175bc09d 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/StreamResourceProcessForm.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/StreamResourceProcessForm.java
@@ -39,6 +39,18 @@ public class StreamResourceProcessForm extends BaseProcessForm {
private GroupOperateType groupOperateType = GroupOperateType.INIT;
+ /**
+ * Get stream resource process form info.
+ */
+ public static StreamResourceProcessForm getProcessForm(InlongGroupInfo groupInfo, InlongStreamInfo streamInfo,
+ GroupOperateType operateType) {
+ StreamResourceProcessForm processForm = new StreamResourceProcessForm();
+ processForm.setGroupInfo(groupInfo);
+ processForm.setStreamInfo(streamInfo);
+ processForm.setGroupOperateType(operateType);
+ return processForm;
+ }
+
@Override
public void validate() throws FormValidateException {
@@ -53,4 +65,5 @@ public class StreamResourceProcessForm extends BaseProcessForm {
public String getInlongGroupId() {
return groupInfo.getInlongGroupId();
}
+
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupCompleteListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupCompleteListener.java
index 944c740c7..ca5a375fb 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupCompleteListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupCompleteListener.java
@@ -18,27 +18,26 @@
package org.apache.inlong.manager.service.listener.group;
import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.GroupStatus;
import org.apache.inlong.manager.common.enums.ProcessEvent;
-import org.apache.inlong.manager.common.enums.SourceStatus;
-import org.apache.inlong.manager.common.enums.StreamStatus;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupRequest;
import org.apache.inlong.manager.pojo.group.InlongGroupUtils;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
import org.apache.inlong.manager.service.group.InlongGroupService;
-import org.apache.inlong.manager.service.source.StreamSourceService;
-import org.apache.inlong.manager.service.stream.InlongStreamService;
+import org.apache.inlong.manager.service.stream.InlongStreamProcessService;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
import org.apache.inlong.manager.workflow.event.process.ProcessEventListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
+import java.util.List;
+
/**
* The listener of InlongGroup when created resources successfully.
*/
@@ -49,11 +48,9 @@ public class InitGroupCompleteListener implements ProcessEventListener {
@Autowired
private InlongGroupService groupService;
@Autowired
- private InlongStreamService streamService;
- @Autowired
- private StreamSourceService sourceService;
- @Autowired
private InlongGroupEntityMapper groupMapper;
+ @Autowired
+ private InlongStreamProcessService streamProcessService;
@Override
public ProcessEvent event() {
@@ -84,12 +81,9 @@ public class InitGroupCompleteListener implements ProcessEventListener {
updateGroupRequest.setVersion(existGroup.getVersion());
groupService.update(updateGroupRequest, operator);
- // update status of other related configs
- streamService.updateStatus(groupId, null, StreamStatus.CONFIG_SUCCESSFUL.getCode(), operator);
- if (InlongConstants.LIGHTWEIGHT_MODE.equals(groupInfo.getLightweight())) {
- sourceService.updateStatus(groupId, null, SourceStatus.SOURCE_NORMAL.getCode(), operator);
- } else {
- sourceService.updateStatus(groupId, null, SourceStatus.TO_BE_ISSUED_ADD.getCode(), operator);
+ List<InlongStreamInfo> streamList = form.getStreamInfos();
+ for (InlongStreamInfo streamInfo : streamList) {
+ streamProcessService.startProcess(groupId, streamInfo.getInlongStreamId(), operator, false);
}
log.info("success to execute InitGroupCompleteListener for groupId={}", groupId);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupListener.java
index d5c238fa8..fbef1c864 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupListener.java
@@ -18,7 +18,6 @@
package org.apache.inlong.manager.service.listener.group;
import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.collections.CollectionUtils;
import org.apache.inlong.manager.common.enums.GroupStatus;
import org.apache.inlong.manager.common.enums.ProcessEvent;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
@@ -59,9 +58,6 @@ public class InitGroupListener implements ProcessEventListener {
if (groupInfo == null) {
throw new WorkflowListenerException("inlong group info cannot be null for init group process");
}
- if (CollectionUtils.isEmpty(form.getStreamInfos())) {
- throw new WorkflowListenerException("inlong stream info list cannot be null for init group process");
- }
groupService.updateStatus(groupId, GroupStatus.CONFIG_ING.getCode(), context.getOperator());
log.info("success to execute InitGroupListener for groupId={}", groupId);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java
index edba69c55..390c11b48 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java
@@ -17,22 +17,40 @@
package org.apache.inlong.manager.service.listener.queue;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.GroupOperateType;
import org.apache.inlong.manager.common.enums.TaskEvent;
+import org.apache.inlong.manager.common.enums.TaskStatus;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.pojo.workflow.TaskResponse;
+import org.apache.inlong.manager.pojo.workflow.WorkflowResult;
import org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
+import org.apache.inlong.manager.pojo.workflow.form.process.StreamResourceProcessForm;
import org.apache.inlong.manager.service.group.InlongGroupService;
import org.apache.inlong.manager.service.resource.queue.QueueResourceOperator;
import org.apache.inlong.manager.service.resource.queue.QueueResourceOperatorFactory;
+import org.apache.inlong.manager.service.workflow.WorkflowService;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
import org.apache.inlong.manager.workflow.event.task.QueueOperateListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.inlong.manager.common.enums.GroupOperateType.INIT;
+import static org.apache.inlong.manager.common.enums.ProcessName.CREATE_STREAM_RESOURCE;
+
/**
* Create message queue resources,
* such as Pulsar Topic and Subscription, TubeMQ Topic and ConsumerGroup, etc.
@@ -41,10 +59,21 @@ import org.springframework.stereotype.Service;
@Service
public class QueueResourceListener implements QueueOperateListener {
+ private static final ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor(
+ 20,
+ 40,
+ 10L,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(),
+ new ThreadFactoryBuilder().setNameFormat("inlong-stream-process-%s").build(),
+ new CallerRunsPolicy());
+
@Autowired
private InlongGroupService groupService;
@Autowired
private QueueResourceOperatorFactory queueOperatorFactory;
+ @Autowired
+ private WorkflowService workflowService;
@Override
public TaskEvent event() {
@@ -82,7 +111,10 @@ public class QueueResourceListener implements QueueOperateListener {
String operator = context.getOperator();
switch (operateType) {
case INIT:
+ // create queue resource for inlong group
queueOperator.createQueueForGroup(groupInfo, operator);
+ // create queue resource for all inlong streams under the inlong group
+ this.createQueueForStreams(groupInfo, groupProcessForm.getStreamInfos(), operator);
break;
case DELETE:
queueOperator.deleteQueueForGroup(groupInfo, operator);
@@ -96,4 +128,39 @@ public class QueueResourceListener implements QueueOperateListener {
return ListenerResult.success("success");
}
+ private void createQueueForStreams(InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfos, String operator) {
+ String groupId = groupInfo.getInlongGroupId();
+ log.info("success to start stream process for groupId={}", groupId);
+
+ for (InlongStreamInfo stream : streamInfos) {
+ StreamResourceProcessForm form = StreamResourceProcessForm.getProcessForm(groupInfo, stream, INIT);
+ String streamId = stream.getInlongStreamId();
+ final String errMsg = "failed to start stream process for groupId=" + groupId + " streamId=" + streamId;
+
+ CompletableFuture<WorkflowResult> future = CompletableFuture
+ .supplyAsync(() -> workflowService.start(CREATE_STREAM_RESOURCE, operator, form), EXECUTOR_SERVICE)
+ .whenComplete((result, ex) -> {
+ if (ex != null) {
+ log.error(errMsg + ": " + ex.getMessage());
+ throw new WorkflowListenerException(errMsg, ex);
+ } else {
+ List<TaskResponse> tasks = result.getNewTasks();
+ if (TaskStatus.FAILED == tasks.get(tasks.size() - 1).getStatus()) {
+ log.error(errMsg);
+ throw new WorkflowListenerException(errMsg);
+ }
+ }
+ });
+ try {
+ future.get(180, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ String msg = "failed to execute stream process in asynchronously ";
+ log.error(msg, e);
+ throw new WorkflowListenerException(msg + ": " + e.getMessage());
+ }
+ }
+
+ log.info("success to start stream process for groupId={}", groupId);
+ }
+
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/StreamQueueResourceListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/StreamQueueResourceListener.java
index 21403ec21..eb3aaab8c 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/StreamQueueResourceListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/StreamQueueResourceListener.java
@@ -29,7 +29,6 @@ import org.apache.inlong.manager.pojo.workflow.form.process.StreamResourceProces
import org.apache.inlong.manager.service.group.InlongGroupService;
import org.apache.inlong.manager.service.resource.queue.QueueResourceOperator;
import org.apache.inlong.manager.service.resource.queue.QueueResourceOperatorFactory;
-import org.apache.inlong.manager.service.stream.InlongStreamService;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
import org.apache.inlong.manager.workflow.event.task.QueueOperateListener;
@@ -46,8 +45,6 @@ public class StreamQueueResourceListener implements QueueOperateListener {
@Autowired
private InlongGroupService groupService;
@Autowired
- private InlongStreamService streamService;
- @Autowired
private QueueResourceOperatorFactory queueOperatorFactory;
@Override
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java
index d04a20852..08a310ddb 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java
@@ -17,6 +17,7 @@
package org.apache.inlong.manager.service.listener.sort;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.inlong.manager.common.enums.GroupOperateType;
import org.apache.inlong.manager.common.enums.TaskEvent;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
@@ -81,6 +82,10 @@ public class SortConfigListener implements SortOperateListener {
}
InlongGroupInfo groupInfo = form.getGroupInfo();
List<InlongStreamInfo> streamInfos = form.getStreamInfos();
+ if (CollectionUtils.isEmpty(streamInfos)) {
+ LOGGER.warn("do not build sort config for groupId={}, as the stream is empty", groupId);
+ return ListenerResult.success();
+ }
int sinkCount = streamInfos.stream()
.map(stream -> stream.getSinkList() == null ? 0 : stream.getSinkList().size())
.reduce(0, Integer::sum);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/stream/InitStreamCompleteListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/stream/InitStreamCompleteListener.java
index be6e60fd6..141635c46 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/stream/InitStreamCompleteListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/stream/InitStreamCompleteListener.java
@@ -18,6 +18,7 @@
package org.apache.inlong.manager.service.listener.stream;
import lombok.extern.slf4j.Slf4j;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ProcessEvent;
import org.apache.inlong.manager.common.enums.SourceStatus;
import org.apache.inlong.manager.common.enums.StreamStatus;
@@ -63,7 +64,11 @@ public class InitStreamCompleteListener implements ProcessEventListener {
// Update status of other related configs
streamService.updateStatus(groupId, streamId, StreamStatus.CONFIG_SUCCESSFUL.getCode(), operator);
streamService.update(streamInfo.genRequest(), operator);
- sourceService.updateStatus(groupId, streamId, SourceStatus.TO_BE_ISSUED_ADD.getCode(), operator);
+ if (InlongConstants.LIGHTWEIGHT_MODE.equals(form.getGroupInfo().getLightweight())) {
+ sourceService.updateStatus(groupId, streamId, SourceStatus.SOURCE_NORMAL.getCode(), operator);
+ } else {
+ sourceService.updateStatus(groupId, streamId, SourceStatus.TO_BE_ISSUED_ADD.getCode(), operator);
+ }
return ListenerResult.success();
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java
index 4e2b7246a..2f65aa735 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java
@@ -54,10 +54,10 @@ public class KafkaResourceOperators implements QueueResourceOperator {
@Autowired
private InlongClusterService clusterService;
@Autowired
- private InlongStreamService streamService;
- @Autowired
private KafkaOperator kafkaOperator;
@Autowired
+ private InlongStreamService streamService;
+ @Autowired
private InlongConsumeService consumeService;
@Override
@@ -67,26 +67,7 @@ public class KafkaResourceOperators implements QueueResourceOperator {
@Override
public void createQueueForGroup(@NotNull InlongGroupInfo groupInfo, @NotBlank String operator) {
- String groupId = groupInfo.getInlongGroupId();
- log.info("begin to create kafka resource for groupId={}", groupId);
-
- InlongKafkaInfo inlongKafkaInfo = (InlongKafkaInfo) groupInfo;
- try {
- // 1. create kafka Topic - each Inlong Stream corresponds to a Kafka Topic
- List<InlongStreamBriefInfo> streamInfoList = streamService.getTopicList(groupId);
- if (streamInfoList == null || streamInfoList.isEmpty()) {
- log.warn("skip to create kafka topic and subscription as no streams for groupId={}", groupId);
- return;
- }
- for (InlongStreamBriefInfo streamInfo : streamInfoList) {
- this.createKafkaTopic(inlongKafkaInfo, streamInfo.getMqResource());
- }
- } catch (Exception e) {
- String msg = String.format("failed to create kafka resource for groupId=%s", groupId);
- log.error(msg, e);
- throw new WorkflowListenerException(msg + ": " + e.getMessage());
- }
- log.info("success to create kafka resource for groupId={}, cluster={}", groupId, inlongKafkaInfo);
+ log.info("skip to create kafka topic for groupId={}", groupInfo.getInlongGroupId());
}
@Override
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java
index 23c13b740..2e379b285 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java
@@ -107,20 +107,6 @@ public class PulsarResourceOperator implements QueueResourceOperator {
log.info("success to create pulsar namespace for groupId={}, namespace={}, cluster={}",
groupId, namespace, clusterName);
}
-
- // create pulsar topic - each Inlong Stream corresponds to a Pulsar topic
- List<InlongStreamBriefInfo> streamInfoList = streamService.getTopicList(groupId);
- if (streamInfoList == null || streamInfoList.isEmpty()) {
- log.warn("skip to create pulsar topic and subscription as no streams for groupId={}, cluster={}",
- groupId, clusterName);
- return;
- }
- // create pulsar topic and subscription
- for (InlongStreamBriefInfo stream : streamInfoList) {
- this.createTopic(pulsarInfo, pulsarCluster, stream.getMqResource());
- this.createSubscription(pulsarInfo, pulsarCluster, stream.getMqResource(),
- stream.getInlongStreamId());
- }
} catch (Exception e) {
String msg = String.format("failed to create pulsar resource for groupId=%s, cluster=%s", groupId,
pulsarCluster.toString());
@@ -184,7 +170,7 @@ public class PulsarResourceOperator implements QueueResourceOperator {
streamInfo.getMqResource(), streamId);
} catch (Exception e) {
String msg = String.format("failed to create pulsar topic for groupId=%s, streamId=%s, cluster=%s",
- groupId, streamId,pulsarCluster.getName());
+ groupId, streamId, pulsarCluster.getName());
log.error(msg, e);
throw new WorkflowListenerException(msg + ": " + e.getMessage());
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
index a4c6b5d80..2449b835e 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
@@ -77,6 +77,10 @@ public class DefaultSortConfigOperator implements SortConfigOperator {
@Override
public void buildConfig(InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfos, boolean isStream)
throws Exception {
+ if (isStream) {
+ LOGGER.warn("stream workflow no need to build sort config for disable zk");
+ return;
+ }
if (groupInfo == null || CollectionUtils.isEmpty(streamInfos)) {
LOGGER.warn("group info is null or stream infos is empty, no need to build sort config for disable zk");
return;
@@ -84,11 +88,7 @@ public class DefaultSortConfigOperator implements SortConfigOperator {
GroupInfo configInfo = this.getGroupInfo(groupInfo, streamInfos);
String dataflow = OBJECT_MAPPER.writeValueAsString(configInfo);
- if (isStream) {
- this.addToStreamExt(streamInfos, dataflow);
- } else {
- this.addToGroupExt(groupInfo, dataflow);
- }
+ this.addToGroupExt(groupInfo, dataflow);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("success to build sort config, isStream={}, dataflow={}", isStream, dataflow);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamProcessService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamProcessService.java
index 06046eceb..eaa05ba41 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamProcessService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamProcessService.java
@@ -92,7 +92,8 @@ public class InlongStreamProcessService {
throw new BusinessException(errMsg);
}
- StreamResourceProcessForm processForm = genStreamProcessForm(groupInfo, streamInfo, GroupOperateType.INIT);
+ StreamResourceProcessForm processForm = StreamResourceProcessForm.getProcessForm(groupInfo,
+ streamInfo, GroupOperateType.INIT);
ProcessName processName = ProcessName.CREATE_STREAM_RESOURCE;
if (sync) {
WorkflowResult workflowResult = workflowService.start(processName, operator, processForm);
@@ -134,7 +135,8 @@ public class InlongStreamProcessService {
String.format("stream status=%s not support suspend stream for groupId=%s streamId=%s",
status, groupId, streamId));
}
- StreamResourceProcessForm processForm = genStreamProcessForm(groupInfo, streamInfo, GroupOperateType.SUSPEND);
+ StreamResourceProcessForm processForm = StreamResourceProcessForm.getProcessForm(groupInfo, streamInfo,
+ GroupOperateType.SUSPEND);
ProcessName processName = ProcessName.SUSPEND_STREAM_RESOURCE;
if (sync) {
WorkflowResult workflowResult = workflowService.start(processName, operator, processForm);
@@ -175,7 +177,8 @@ public class InlongStreamProcessService {
String.format("stream status=%s not support restart stream for groupId=%s streamId=%s",
status, groupId, streamId));
}
- StreamResourceProcessForm processForm = genStreamProcessForm(groupInfo, streamInfo, GroupOperateType.RESTART);
+ StreamResourceProcessForm processForm = StreamResourceProcessForm.getProcessForm(groupInfo, streamInfo,
+ GroupOperateType.RESTART);
ProcessName processName = ProcessName.RESTART_STREAM_RESOURCE;
if (sync) {
WorkflowResult workflowResult = workflowService.start(processName, operator, processForm);
@@ -220,7 +223,8 @@ public class InlongStreamProcessService {
String.format("stream status=%s not support delete stream for groupId=%s streamId=%s",
status, groupId, streamId));
}
- StreamResourceProcessForm processForm = genStreamProcessForm(groupInfo, streamInfo, GroupOperateType.DELETE);
+ StreamResourceProcessForm processForm = StreamResourceProcessForm.getProcessForm(groupInfo, streamInfo,
+ GroupOperateType.DELETE);
ProcessName processName = ProcessName.DELETE_STREAM_RESOURCE;
if (sync) {
WorkflowResult workflowResult = workflowService.start(processName, operator, processForm);
@@ -242,12 +246,4 @@ public class InlongStreamProcessService {
}
}
- private StreamResourceProcessForm genStreamProcessForm(InlongGroupInfo groupInfo, InlongStreamInfo streamInfo,
- GroupOperateType operateType) {
- StreamResourceProcessForm processForm = new StreamResourceProcessForm();
- processForm.setGroupInfo(groupInfo);
- processForm.setStreamInfo(streamInfo);
- processForm.setGroupOperateType(operateType);
- return processForm;
- }
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImpl.java
index d3d6bac9a..b59842f41 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImpl.java
@@ -189,6 +189,7 @@ public class WorkflowServiceImpl implements WorkflowService {
ProcessRequest processRequest = new ProcessRequest();
processRequest.setInlongGroupId(groupId);
+ processRequest.setInlongStreamId(query.getInlongStreamId());
processRequest.setNameList(processNameList);
processRequest.setHidden(1);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/CreateGroupWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/CreateGroupWorkflowDefinition.java
index 01d500bf0..94eb0c3fb 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/CreateGroupWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/CreateGroupWorkflowDefinition.java
@@ -77,14 +77,6 @@ public class CreateGroupWorkflowDefinition implements WorkflowDefinition {
initMQTask.setListenerFactory(groupTaskListenerFactory);
process.addTask(initMQTask);
- // Init Sink
- ServiceTask initSinkTask = new ServiceTask();
- initSinkTask.setName("InitSink");
- initSinkTask.setDisplayName("Group-InitSink");
- initSinkTask.setServiceTaskType(ServiceTaskType.INIT_SINK);
- initSinkTask.setListenerFactory(groupTaskListenerFactory);
- process.addTask(initSinkTask);
-
// Init Sort
ServiceTask initSortTask = new ServiceTask();
initSortTask.setName("InitSort");
@@ -93,25 +85,15 @@ public class CreateGroupWorkflowDefinition implements WorkflowDefinition {
initSortTask.setListenerFactory(groupTaskListenerFactory);
process.addTask(initSortTask);
- // Init Source
- ServiceTask initSourceTask = new ServiceTask();
- initSourceTask.setName("InitSource");
- initSourceTask.setDisplayName("Group-InitSource");
- initSourceTask.setServiceTaskType(ServiceTaskType.INIT_SOURCE);
- initSourceTask.setListenerFactory(groupTaskListenerFactory);
- process.addTask(initSourceTask);
-
// End node
EndEvent endEvent = new EndEvent();
process.setEndEvent(endEvent);
- // Task dependency order: 1.MQ -> 2.Sink -> 3.Sort -> 4.Source
+ // Task dependency order: 1.MQ -> 2.Sink-in-Stream -> 3.Sort -> 4.Source-in-Stream
// To ensure that after some tasks fail, data will not start to be collected by source or consumed by sort
startEvent.addNext(initMQTask);
- initMQTask.addNext(initSinkTask);
- initSinkTask.addNext(initSortTask);
- initSortTask.addNext(initSourceTask);
- initSourceTask.addNext(endEvent);
+ initMQTask.addNext(initSortTask);
+ initSortTask.addNext(endEvent);
return process;
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/DeleteGroupWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/DeleteGroupWorkflowDefinition.java
index 395a3dec9..5d3eb090e 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/DeleteGroupWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/DeleteGroupWorkflowDefinition.java
@@ -18,13 +18,13 @@
package org.apache.inlong.manager.service.workflow.group;
import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
import org.apache.inlong.manager.common.enums.ProcessName;
-import org.apache.inlong.manager.service.workflow.WorkflowDefinition;
+import org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
+import org.apache.inlong.manager.service.listener.GroupTaskListenerFactory;
import org.apache.inlong.manager.service.listener.group.UpdateGroupCompleteListener;
import org.apache.inlong.manager.service.listener.group.UpdateGroupFailedListener;
import org.apache.inlong.manager.service.listener.group.UpdateGroupListener;
-import org.apache.inlong.manager.service.listener.GroupTaskListenerFactory;
+import org.apache.inlong.manager.service.workflow.WorkflowDefinition;
import org.apache.inlong.manager.workflow.definition.EndEvent;
import org.apache.inlong.manager.workflow.definition.ServiceTask;
import org.apache.inlong.manager.workflow.definition.ServiceTaskType;
@@ -93,6 +93,8 @@ public class DeleteGroupWorkflowDefinition implements WorkflowDefinition {
deleteSortTask.setListenerFactory(groupTaskListenerFactory);
process.addTask(deleteSortTask);
+ // No need to delete the sink because we should not affect the existing data in the sink
+
// End node
EndEvent endEvent = new EndEvent();
process.setEndEvent(endEvent);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java
index 79f154785..6ce7822b5 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java
@@ -18,13 +18,13 @@
package org.apache.inlong.manager.service.workflow.stream;
import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.pojo.workflow.form.process.StreamResourceProcessForm;
import org.apache.inlong.manager.common.enums.ProcessName;
-import org.apache.inlong.manager.service.workflow.WorkflowDefinition;
+import org.apache.inlong.manager.pojo.workflow.form.process.StreamResourceProcessForm;
import org.apache.inlong.manager.service.listener.StreamTaskListenerFactory;
import org.apache.inlong.manager.service.listener.stream.InitStreamCompleteListener;
import org.apache.inlong.manager.service.listener.stream.InitStreamFailedListener;
import org.apache.inlong.manager.service.listener.stream.InitStreamListener;
+import org.apache.inlong.manager.service.workflow.WorkflowDefinition;
import org.apache.inlong.manager.workflow.definition.EndEvent;
import org.apache.inlong.manager.workflow.definition.ServiceTask;
import org.apache.inlong.manager.workflow.definition.ServiceTaskType;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/DeleteStreamWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/DeleteStreamWorkflowDefinition.java
index 7dd11ca4f..415d78f3f 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/DeleteStreamWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/DeleteStreamWorkflowDefinition.java
@@ -18,13 +18,13 @@
package org.apache.inlong.manager.service.workflow.stream;
import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.pojo.workflow.form.process.StreamResourceProcessForm;
import org.apache.inlong.manager.common.enums.ProcessName;
-import org.apache.inlong.manager.service.workflow.WorkflowDefinition;
+import org.apache.inlong.manager.pojo.workflow.form.process.StreamResourceProcessForm;
import org.apache.inlong.manager.service.listener.StreamTaskListenerFactory;
import org.apache.inlong.manager.service.listener.stream.UpdateStreamCompleteListener;
import org.apache.inlong.manager.service.listener.stream.UpdateStreamFailedListener;
import org.apache.inlong.manager.service.listener.stream.UpdateStreamListener;
+import org.apache.inlong.manager.service.workflow.WorkflowDefinition;
import org.apache.inlong.manager.workflow.definition.EndEvent;
import org.apache.inlong.manager.workflow.definition.ServiceTask;
import org.apache.inlong.manager.workflow.definition.ServiceTaskType;
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/group/CreateGroupWorkflowDefinitionTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/group/CreateGroupWorkflowDefinitionTest.java
index 31c74a2cb..217d23841 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/group/CreateGroupWorkflowDefinitionTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/group/CreateGroupWorkflowDefinitionTest.java
@@ -37,11 +37,9 @@ public class CreateGroupWorkflowDefinitionTest extends ServiceBaseTest {
WorkflowProcess cloneProcess1 = process.clone();
WorkflowProcess cloneProcess2 = cloneProcess1.clone();
Assertions.assertNotSame(cloneProcess2, cloneProcess1);
- Assertions.assertNotNull(process.getTaskByName("InitSource"));
Assertions.assertNotNull(process.getTaskByName("InitMQ"));
Assertions.assertNotNull(process.getTaskByName("InitSort"));
- Assertions.assertNotNull(process.getTaskByName("InitSink"));
- Assertions.assertEquals(4, process.getNameToTaskMap().size());
+ Assertions.assertEquals(2, process.getNameToTaskMap().size());
}
}
diff --git a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
index 951d854c9..53ece0c11 100644
--- a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
+++ b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
@@ -606,7 +606,8 @@ CREATE TABLE IF NOT EXISTS `workflow_process`
`display_name` varchar(256) NOT NULL COMMENT 'Process display name',
`type` varchar(256) DEFAULT NULL COMMENT 'Process classification',
`title` varchar(256) DEFAULT NULL COMMENT 'Process title',
- `inlong_group_id` varchar(256) DEFAULT NULL COMMENT 'Inlong group id: to facilitate related inlong group',
+ `inlong_group_id` varchar(256) DEFAULT NULL COMMENT 'Inlong group id to which this process belongs',
+ `inlong_stream_id`varchar(256) DEFAULT NULL COMMENT 'Inlong stream id to which this process belongs',
`applicant` varchar(256) NOT NULL COMMENT 'Applicant',
`status` varchar(64) NOT NULL COMMENT 'Status',
`form_data` mediumtext COMMENT 'Form information',
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index fbd73961f..6f6a45fab 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -642,7 +642,8 @@ CREATE TABLE IF NOT EXISTS `workflow_process`
`display_name` varchar(256) NOT NULL COMMENT 'Process display name',
`type` varchar(256) DEFAULT NULL COMMENT 'Process classification',
`title` varchar(256) DEFAULT NULL COMMENT 'Process title',
- `inlong_group_id` varchar(256) DEFAULT NULL COMMENT 'Inlong group id: to facilitate related inlong group',
+ `inlong_group_id` varchar(256) DEFAULT NULL COMMENT 'Inlong group id to which this process belongs',
+ `inlong_stream_id`varchar(256) DEFAULT NULL COMMENT 'Inlong stream id to which this process belongs',
`applicant` varchar(256) NOT NULL COMMENT 'Applicant',
`status` varchar(64) NOT NULL COMMENT 'Status',
`form_data` mediumtext COMMENT 'Form information',
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/StartEventProcessor.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/StartEventProcessor.java
index 9d989875e..d3b26786e 100644
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/StartEventProcessor.java
+++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/StartEventProcessor.java
@@ -25,6 +25,7 @@ import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.WorkflowProcessEntity;
import org.apache.inlong.manager.dao.mapper.WorkflowProcessEntityMapper;
import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
+import org.apache.inlong.manager.pojo.workflow.form.process.StreamResourceProcessForm;
import org.apache.inlong.manager.workflow.WorkflowAction;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.definition.StartEvent;
@@ -91,6 +92,10 @@ public class StartEventProcessor extends AbstractNextableElementProcessor<StartE
processEntity.setType(process.getType());
processEntity.setTitle(form.getTitle());
processEntity.setInlongGroupId(form.getInlongGroupId());
+ if (form instanceof StreamResourceProcessForm) {
+ StreamResourceProcessForm streamForm = (StreamResourceProcessForm) form;
+ processEntity.setInlongStreamId(streamForm.getStreamInfo().getInlongStreamId());
+ }
processEntity.setApplicant(applicant);
processEntity.setStatus(ProcessStatus.PROCESSING.name());
try {