You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2024/03/22 10:24:31 UTC

(inlong) branch dev-offline-sync updated: [INLONG-9862][Manager] Support submit flink job for offline sync (#9865)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch dev-offline-sync
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/dev-offline-sync by this push:
     new ebbbde7602 [INLONG-9862][Manager] Support submit flink job for offline sync (#9865)
ebbbde7602 is described below

commit ebbbde7602aed8638513c7b3fbdf22c20d1bd503
Author: AloysZhang <al...@apache.org>
AuthorDate: Fri Mar 22 18:24:26 2024 +0800

    [INLONG-9862][Manager] Support submit flink job for offline sync (#9865)
---
 .../manager/common/consts/InlongConstants.java     |  4 +
 .../plugin/listener/StartupSortListener.java       | 28 ++++---
 .../manager/pojo/sort/util/StreamParseUtils.java   | 21 +++++
 .../listener/StreamTaskListenerFactory.java        | 28 +++++++
 .../schedule/StreamScheduleResourceListener.java   | 95 ++++++++++++++++++++++
 .../service/listener/sort/SortConfigListener.java  |  8 ++
 .../stream/CreateStreamWorkflowDefinition.java     | 13 ++-
 .../workflow/definition/ServiceTaskType.java       |  1 +
 .../task/ScheduleOperateListener.java}             | 36 ++++----
 .../manager/workflow/plugin/ProcessPlugin.java     |  5 ++
 10 files changed, 204 insertions(+), 35 deletions(-)

diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
index e84dcc6602..f0156cd0e9 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
@@ -141,6 +141,10 @@ public class InlongConstants {
      */
     public static final String DATAFLOW = "dataflow";
 
+    public static final String REGISTER_SCHEDULE_STATUS = "register.schedule.status";
+
+    public static final String REGISTERED = "registered";
+
     public static final String STREAMS = "streams";
 
     public static final String RELATIONS = "relations";
diff --git a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
index f6b3b6061a..fec87280d1 100644
--- a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
+++ b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
@@ -26,6 +26,7 @@ import org.apache.inlong.manager.plugin.flink.FlinkOperation;
 import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
 import org.apache.inlong.manager.plugin.flink.enums.Constants;
 import org.apache.inlong.manager.pojo.sink.StreamSink;
+import org.apache.inlong.manager.pojo.sort.util.StreamParseUtils;
 import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
@@ -98,6 +99,14 @@ public class StartupSortListener implements SortOperateListener {
         }
 
         for (InlongStreamInfo streamInfo : streamInfos) {
+            boolean isOfflineSync = InlongConstants.DATASYNC_OFFLINE_MODE
+                    .equals(groupResourceForm.getGroupInfo().getInlongGroupMode());
+            // do not submit flink job if the group mode is offline and the stream is not config successfully
+            if (isOfflineSync && !StreamParseUtils.isRegisterScheduleSuccess(streamInfo)) {
+                log.info("no need to submit flink job for groupId={} streamId={} as the mode is offline "
+                        + "and the stream is not config successfully yet", groupId, streamInfo.getInlongStreamId());
+                continue;
+            }
             List<StreamSink> sinkList = streamInfo.getSinkList();
             List<String> sinkTypes = sinkList.stream().map(StreamSink::getSinkType).collect(Collectors.toList());
             if (CollectionUtils.isEmpty(sinkList) || !SinkType.containSortFlinkSink(sinkTypes)) {
@@ -131,9 +140,6 @@ public class StartupSortListener implements SortOperateListener {
                 return ListenerResult.fail(message);
             }
 
-            boolean isRealTimeSync = InlongConstants.DATASYNC_REALTIME_MODE
-                    .equals(groupResourceForm.getGroupInfo().getInlongGroupMode());
-
             FlinkInfo flinkInfo = new FlinkInfo();
 
             String jobName = Constants.SORT_JOB_NAME_GENERATOR.apply(processForm) + InlongConstants.HYPHEN
@@ -141,20 +147,18 @@ public class StartupSortListener implements SortOperateListener {
             flinkInfo.setJobName(jobName);
             flinkInfo.setEndpoint(kvConf.get(InlongConstants.SORT_URL));
             flinkInfo.setInlongStreamInfoList(Collections.singletonList(streamInfo));
-            if (isRealTimeSync) {
-                flinkInfo.setRuntimeExecutionMode(InlongConstants.RUNTIME_EXECUTION_MODE_STREAMING);
-            } else {
+            if (isOfflineSync) {
                 flinkInfo.setRuntimeExecutionMode(InlongConstants.RUNTIME_EXECUTION_MODE_BATCH);
+            } else {
+                flinkInfo.setRuntimeExecutionMode(InlongConstants.RUNTIME_EXECUTION_MODE_STREAMING);
             }
             FlinkOperation flinkOperation = FlinkOperation.getInstance();
             try {
                 flinkOperation.genPath(flinkInfo, dataflow);
-                // only start job for real-time mode
-                if (isRealTimeSync) {
-                    flinkOperation.start(flinkInfo);
-                    log.info("job submit success for groupId = {}, streamId = {}, jobId = {}", groupId,
-                            streamInfo.getInlongStreamId(), flinkInfo.getJobId());
-                }
+                flinkOperation.start(flinkInfo);
+                log.info("job submit success for groupId = {}, mode = {}, streamId = {}, jobId = {}",
+                        groupId, groupResourceForm.getGroupInfo().getInlongGroupMode(),
+                        streamInfo.getInlongStreamId(), flinkInfo.getJobId());
             } catch (Exception e) {
                 flinkInfo.setException(true);
                 flinkInfo.setExceptionMsg(getExceptionStackMsg(e));
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/StreamParseUtils.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/StreamParseUtils.java
index 92470968ac..203bcc6778 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/StreamParseUtils.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/StreamParseUtils.java
@@ -17,11 +17,14 @@
 
 package org.apache.inlong.manager.pojo.sort.util;
 
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.TransformType;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.pojo.sink.StreamSink;
 import org.apache.inlong.manager.pojo.source.StreamSource;
+import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.pojo.stream.StreamNode;
 import org.apache.inlong.manager.pojo.stream.StreamPipeline;
 import org.apache.inlong.manager.pojo.stream.StreamTransform;
@@ -38,6 +41,8 @@ import org.apache.inlong.manager.pojo.transform.splitter.SplitterDefinition;
 
 import com.google.gson.Gson;
 import com.google.gson.JsonObject;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
 
 /**
  * Utils of stream parse.
@@ -154,4 +159,20 @@ public class StreamParseUtils {
         return GSON.fromJson(tempView, StreamPipeline.class);
     }
 
+    public static String getStreamExtProperty(String key, InlongStreamInfo streamInfo) {
+        if (StringUtils.isNotBlank(key) && streamInfo != null && CollectionUtils.isNotEmpty(streamInfo.getExtList())) {
+            for (InlongStreamExtInfo ext : streamInfo.getExtList()) {
+                if (key.equalsIgnoreCase(ext.getKeyName())) {
+                    return ext.getKeyValue();
+                }
+            }
+        }
+        return null;
+    }
+
+    public static boolean isRegisterScheduleSuccess(InlongStreamInfo streamInfo) {
+        return InlongConstants.REGISTERED
+                .equalsIgnoreCase(getStreamExtProperty(InlongConstants.REGISTER_SCHEDULE_STATUS, streamInfo));
+    }
+
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/StreamTaskListenerFactory.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/StreamTaskListenerFactory.java
index 43fdfe4666..87c3a867c2 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/StreamTaskListenerFactory.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/StreamTaskListenerFactory.java
@@ -20,12 +20,14 @@ package org.apache.inlong.manager.service.listener;
 import org.apache.inlong.manager.common.plugin.Plugin;
 import org.apache.inlong.manager.common.plugin.PluginBinder;
 import org.apache.inlong.manager.service.listener.queue.StreamQueueResourceListener;
+import org.apache.inlong.manager.service.listener.schedule.StreamScheduleResourceListener;
 import org.apache.inlong.manager.service.listener.sink.StreamSinkResourceListener;
 import org.apache.inlong.manager.service.listener.sort.StreamSortConfigListener;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.definition.ServiceTaskType;
 import org.apache.inlong.manager.workflow.definition.TaskListenerFactory;
 import org.apache.inlong.manager.workflow.event.task.QueueOperateListener;
+import org.apache.inlong.manager.workflow.event.task.ScheduleOperateListener;
 import org.apache.inlong.manager.workflow.event.task.SinkOperateListener;
 import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
 import org.apache.inlong.manager.workflow.event.task.SourceOperateListener;
@@ -53,6 +55,7 @@ public class StreamTaskListenerFactory implements PluginBinder, TaskListenerFact
     private List<QueueOperateListener> queueOperateListeners;
     private List<SortOperateListener> sortOperateListeners;
     private List<SinkOperateListener> sinkOperateListeners;
+    private List<ScheduleOperateListener> scheduleOperateListeners;
 
     @Autowired
     private StreamQueueResourceListener queueResourceListener;
@@ -60,6 +63,8 @@ public class StreamTaskListenerFactory implements PluginBinder, TaskListenerFact
     private StreamSortConfigListener streamSortConfigListener;
     @Autowired
     private StreamSinkResourceListener sinkResourceListener;
+    @Autowired
+    private StreamScheduleResourceListener scheduleResourceListener;
 
     @PostConstruct
     public void init() {
@@ -70,6 +75,8 @@ public class StreamTaskListenerFactory implements PluginBinder, TaskListenerFact
         sortOperateListeners.add(streamSortConfigListener);
         sinkOperateListeners = new LinkedList<>();
         sinkOperateListeners.add(sinkResourceListener);
+        scheduleOperateListeners = new LinkedList<>();
+        scheduleOperateListeners.add(scheduleResourceListener);
     }
 
     @Override
@@ -94,6 +101,10 @@ public class StreamTaskListenerFactory implements PluginBinder, TaskListenerFact
         if (CollectionUtils.isNotEmpty(pluginSinkOperateListeners)) {
             sinkOperateListeners.addAll(pluginSinkOperateListeners);
         }
+        List<ScheduleOperateListener> pluginScheduleOperateListeners = processPlugin.createScheduleOperateListeners();
+        if (CollectionUtils.isNotEmpty(pluginScheduleOperateListeners)) {
+            scheduleOperateListeners.addAll(pluginScheduleOperateListeners);
+        }
     }
 
     @Override
@@ -118,6 +129,9 @@ public class StreamTaskListenerFactory implements PluginBinder, TaskListenerFact
             case INIT_SINK:
                 List<SinkOperateListener> sinkOperateListeners = getSinkOperateListener(workflowContext);
                 return Lists.newArrayList(sinkOperateListeners);
+            case INIT_SCHEDULE:
+                List<ScheduleOperateListener> scheduleOperateListeners = getScheduleOperateListener(workflowContext);
+                return Lists.newArrayList(scheduleOperateListeners);
             default:
                 throw new IllegalArgumentException(String.format("Unsupported ServiceTaskType %s", taskType));
         }
@@ -131,6 +145,7 @@ public class StreamTaskListenerFactory implements PluginBinder, TaskListenerFact
         queueOperateListeners = new LinkedList<>();
         sortOperateListeners = new LinkedList<>();
         sinkOperateListeners = new LinkedList<>();
+        scheduleOperateListeners = new LinkedList<>();
     }
 
     /**
@@ -185,4 +200,17 @@ public class StreamTaskListenerFactory implements PluginBinder, TaskListenerFact
         return listeners;
     }
 
+    /**
+     * Get schedule operate listener list.
+     */
+    private List<ScheduleOperateListener> getScheduleOperateListener(WorkflowContext context) {
+        List<ScheduleOperateListener> listeners = new ArrayList<>();
+        for (ScheduleOperateListener listener : scheduleOperateListeners) {
+            if (listener != null && listener.accept(context)) {
+                listeners.add(listener);
+            }
+        }
+        return listeners;
+    }
+
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/schedule/StreamScheduleResourceListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/schedule/StreamScheduleResourceListener.java
new file mode 100644
index 0000000000..61179dc343
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/schedule/StreamScheduleResourceListener.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.listener.schedule;
+
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.enums.GroupOperateType;
+import org.apache.inlong.manager.common.enums.TaskEvent;
+import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
+import org.apache.inlong.manager.pojo.workflow.form.process.StreamResourceProcessForm;
+import org.apache.inlong.manager.workflow.WorkflowContext;
+import org.apache.inlong.manager.workflow.event.ListenerResult;
+import org.apache.inlong.manager.workflow.event.task.ScheduleOperateListener;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Service;
+
+import java.util.List;
+
+@Service
+@Slf4j
+public class StreamScheduleResourceListener implements ScheduleOperateListener {
+
+    @Override
+    public TaskEvent event() {
+        return TaskEvent.COMPLETE;
+    }
+
+    @Override
+    public boolean accept(WorkflowContext context) {
+        ProcessForm processForm = context.getProcessForm();
+        String groupId = processForm.getInlongGroupId();
+        if (!(processForm instanceof StreamResourceProcessForm)) {
+            log.info("not add schedule stream listener, not StreamResourceProcessForm for groupId [{}]", groupId);
+            return false;
+        }
+
+        StreamResourceProcessForm streamProcessForm = (StreamResourceProcessForm) processForm;
+        String streamId = streamProcessForm.getStreamInfo().getInlongStreamId();
+        if (streamProcessForm.getGroupOperateType() != GroupOperateType.INIT) {
+            log.info("not add schedule stream listener, as the operate was not INIT for groupId [{}] streamId [{}]",
+                    groupId, streamId);
+            return false;
+        }
+
+        log.info("add startup stream listener for groupId [{}] streamId [{}]", groupId, streamId);
+        return InlongConstants.DATASYNC_OFFLINE_MODE.equals(streamProcessForm.getGroupInfo().getInlongGroupMode());
+    }
+
+    @Override
+    public ListenerResult listen(WorkflowContext context) throws Exception {
+        StreamResourceProcessForm form = (StreamResourceProcessForm) context.getProcessForm();
+        InlongStreamInfo streamInfo = form.getStreamInfo();
+        final String groupId = streamInfo.getInlongGroupId();
+        final String streamId = streamInfo.getInlongStreamId();
+        log.info("begin to register schedule info for groupId={}, streamId={}", groupId, streamId);
+
+        // todo: register schedule info to schedule service
+
+        // after register schedule info successfully, add ext property to stream info
+        saveInfo(streamInfo, InlongConstants.REGISTER_SCHEDULE_STATUS,
+                InlongConstants.REGISTERED, streamInfo.getExtList());
+        log.info("success to register schedule info for group [" + groupId + "] and stream [" + streamId + "]");
+        return ListenerResult.success();
+    }
+
+    /**
+     * Save stream ext info into list.
+     */
+    private void saveInfo(InlongStreamInfo streamInfo, String keyName, String keyValue,
+            List<InlongStreamExtInfo> extInfoList) {
+        InlongStreamExtInfo extInfo = new InlongStreamExtInfo();
+        extInfo.setInlongGroupId(streamInfo.getInlongGroupId());
+        extInfo.setInlongStreamId(streamInfo.getInlongStreamId());
+        extInfo.setKeyName(keyName);
+        extInfo.setKeyValue(keyValue);
+        extInfoList.add(extInfo);
+    }
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java
index ebed9e72e3..f8981bf09a 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java
@@ -24,6 +24,7 @@ import org.apache.inlong.manager.common.enums.TaskEvent;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.sink.StreamSink;
+import org.apache.inlong.manager.pojo.sort.util.StreamParseUtils;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
 import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
@@ -125,6 +126,13 @@ public class SortConfigListener implements SortOperateListener {
 
         try {
             for (InlongStreamInfo streamInfo : streamInfos) {
+                // do not build sort config if the group mode is offline and the stream is not config successfully
+                if (InlongConstants.DATASYNC_OFFLINE_MODE.equals(groupInfo.getInlongGroupMode())
+                        && !StreamParseUtils.isRegisterScheduleSuccess(streamInfo)) {
+                    LOGGER.info("no need to build sort config for groupId={} streamId={} as the mode is offline "
+                            + "and the stream is not config successfully yet", groupId, streamInfo.getInlongStreamId());
+                    continue;
+                }
                 List<StreamSink> sinkList = streamInfo.getSinkList();
                 if (CollectionUtils.isEmpty(sinkList)) {
                     continue;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java
index d3cf199a21..5505ed0694 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java
@@ -102,17 +102,26 @@ public class CreateStreamWorkflowDefinition implements WorkflowDefinition {
         initSourceTask.setListenerFactory(streamTaskListenerFactory);
         process.addTask(initSourceTask);
 
+        // Init Schedule info
+        ServiceTask initScheduleTask = new ServiceTask();
+        initScheduleTask.setName("InitSchedule");
+        initScheduleTask.setDisplayName("Stream-InitSchedule");
+        initScheduleTask.setServiceTaskType(ServiceTaskType.INIT_SCHEDULE);
+        initScheduleTask.setListenerFactory(streamTaskListenerFactory);
+        process.addTask(initScheduleTask);
+
         // End node
         EndEvent endEvent = new EndEvent();
         process.setEndEvent(endEvent);
 
-        // Task dependency order: 1.MQ -> 2.Sink -> 3.Sort -> 4.Source
+        // Task dependency order: 1.MQ -> 2.Sink -> 3.Sort -> 4.Source -> 5.Schedule
         // To ensure that after some tasks fail, data will not start to be collected by source or consumed by sort
         startEvent.addNext(initMQTask);
         initMQTask.addNext(initSinkTask);
         initSinkTask.addNext(initSortTask);
         initSortTask.addNext(initSourceTask);
-        initSourceTask.addNext(endEvent);
+        initSourceTask.addNext(initScheduleTask);
+        initScheduleTask.addNext(endEvent);
 
         return process;
     }
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/ServiceTaskType.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/ServiceTaskType.java
index 2686b4b60a..156863a676 100644
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/ServiceTaskType.java
+++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/ServiceTaskType.java
@@ -25,6 +25,7 @@ public enum ServiceTaskType {
     INIT_MQ,
     INIT_SORT,
     INIT_SINK,
+    INIT_SCHEDULE,
 
     STOP_SOURCE,
     STOP_SORT,
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/ServiceTaskType.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/task/ScheduleOperateListener.java
similarity index 55%
copy from inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/ServiceTaskType.java
copy to inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/task/ScheduleOperateListener.java
index 2686b4b60a..c3e162cd26 100644
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/ServiceTaskType.java
+++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/task/ScheduleOperateListener.java
@@ -15,30 +15,24 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.workflow.definition;
+package org.apache.inlong.manager.workflow.event.task;
 
-import java.util.Locale;
+import org.apache.inlong.manager.common.enums.TaskEvent;
+import org.apache.inlong.manager.workflow.WorkflowContext;
+import org.apache.inlong.manager.workflow.event.ListenerResult;
 
-public enum ServiceTaskType {
+public interface ScheduleOperateListener extends TaskEventListener {
 
-    INIT_SOURCE,
-    INIT_MQ,
-    INIT_SORT,
-    INIT_SINK,
+    ScheduleOperateListener DEFAULT_SCHEDULE_OPERATE_LISTENER = new ScheduleOperateListener() {
 
-    STOP_SOURCE,
-    STOP_SORT,
-
-    RESTART_SOURCE,
-    RESTART_SORT,
-
-    DELETE_SOURCE,
-    DELETE_MQ,
-    DELETE_SORT;
-
-    @Override
-    public String toString() {
-        return name().toLowerCase(Locale.ROOT);
-    }
+        @Override
+        public TaskEvent event() {
+            return TaskEvent.COMPLETE;
+        }
 
+        @Override
+        public ListenerResult listen(WorkflowContext context) {
+            return ListenerResult.success();
+        }
+    };
 }
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/ProcessPlugin.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/ProcessPlugin.java
index 7e15d52761..452fa2bfb8 100644
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/ProcessPlugin.java
+++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/ProcessPlugin.java
@@ -19,6 +19,7 @@ package org.apache.inlong.manager.workflow.plugin;
 
 import org.apache.inlong.manager.common.plugin.Plugin;
 import org.apache.inlong.manager.workflow.event.task.QueueOperateListener;
+import org.apache.inlong.manager.workflow.event.task.ScheduleOperateListener;
 import org.apache.inlong.manager.workflow.event.task.SinkOperateListener;
 import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
 import org.apache.inlong.manager.workflow.event.task.SourceOperateListener;
@@ -38,6 +39,10 @@ public interface ProcessPlugin extends Plugin {
         return null;
     }
 
+    default List<ScheduleOperateListener> createScheduleOperateListeners() {
+        return null;
+    }
+
     default List<QueueOperateListener> createQueueOperateListeners() {
         return null;
     }