You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2024/03/22 10:24:31 UTC
(inlong) branch dev-offline-sync updated: [INLONG-9862][Manager] Support submit flink job for offline sync (#9865)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch dev-offline-sync
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/dev-offline-sync by this push:
new ebbbde7602 [INLONG-9862][Manager] Support submit flink job for offline sync (#9865)
ebbbde7602 is described below
commit ebbbde7602aed8638513c7b3fbdf22c20d1bd503
Author: AloysZhang <al...@apache.org>
AuthorDate: Fri Mar 22 18:24:26 2024 +0800
[INLONG-9862][Manager] Support submit flink job for offline sync (#9865)
---
.../manager/common/consts/InlongConstants.java | 4 +
.../plugin/listener/StartupSortListener.java | 28 ++++---
.../manager/pojo/sort/util/StreamParseUtils.java | 21 +++++
.../listener/StreamTaskListenerFactory.java | 28 +++++++
.../schedule/StreamScheduleResourceListener.java | 95 ++++++++++++++++++++++
.../service/listener/sort/SortConfigListener.java | 8 ++
.../stream/CreateStreamWorkflowDefinition.java | 13 ++-
.../workflow/definition/ServiceTaskType.java | 1 +
.../task/ScheduleOperateListener.java} | 36 ++++----
.../manager/workflow/plugin/ProcessPlugin.java | 5 ++
10 files changed, 204 insertions(+), 35 deletions(-)
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
index e84dcc6602..f0156cd0e9 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
@@ -141,6 +141,10 @@ public class InlongConstants {
*/
public static final String DATAFLOW = "dataflow";
+ public static final String REGISTER_SCHEDULE_STATUS = "register.schedule.status";
+
+ public static final String REGISTERED = "registered";
+
public static final String STREAMS = "streams";
public static final String RELATIONS = "relations";
diff --git a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
index f6b3b6061a..fec87280d1 100644
--- a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
+++ b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
@@ -26,6 +26,7 @@ import org.apache.inlong.manager.plugin.flink.FlinkOperation;
import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
import org.apache.inlong.manager.plugin.flink.enums.Constants;
import org.apache.inlong.manager.pojo.sink.StreamSink;
+import org.apache.inlong.manager.pojo.sort.util.StreamParseUtils;
import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
@@ -98,6 +99,14 @@ public class StartupSortListener implements SortOperateListener {
}
for (InlongStreamInfo streamInfo : streamInfos) {
+ boolean isOfflineSync = InlongConstants.DATASYNC_OFFLINE_MODE
+ .equals(groupResourceForm.getGroupInfo().getInlongGroupMode());
+ // do not submit flink job if the group mode is offline and the stream is not config successfully
+ if (isOfflineSync && !StreamParseUtils.isRegisterScheduleSuccess(streamInfo)) {
+ log.info("no need to submit flink job for groupId={} streamId={} as the mode is offline "
+ + "and the stream is not config successfully yet", groupId, streamInfo.getInlongStreamId());
+ continue;
+ }
List<StreamSink> sinkList = streamInfo.getSinkList();
List<String> sinkTypes = sinkList.stream().map(StreamSink::getSinkType).collect(Collectors.toList());
if (CollectionUtils.isEmpty(sinkList) || !SinkType.containSortFlinkSink(sinkTypes)) {
@@ -131,9 +140,6 @@ public class StartupSortListener implements SortOperateListener {
return ListenerResult.fail(message);
}
- boolean isRealTimeSync = InlongConstants.DATASYNC_REALTIME_MODE
- .equals(groupResourceForm.getGroupInfo().getInlongGroupMode());
-
FlinkInfo flinkInfo = new FlinkInfo();
String jobName = Constants.SORT_JOB_NAME_GENERATOR.apply(processForm) + InlongConstants.HYPHEN
@@ -141,20 +147,18 @@ public class StartupSortListener implements SortOperateListener {
flinkInfo.setJobName(jobName);
flinkInfo.setEndpoint(kvConf.get(InlongConstants.SORT_URL));
flinkInfo.setInlongStreamInfoList(Collections.singletonList(streamInfo));
- if (isRealTimeSync) {
- flinkInfo.setRuntimeExecutionMode(InlongConstants.RUNTIME_EXECUTION_MODE_STREAMING);
- } else {
+ if (isOfflineSync) {
flinkInfo.setRuntimeExecutionMode(InlongConstants.RUNTIME_EXECUTION_MODE_BATCH);
+ } else {
+ flinkInfo.setRuntimeExecutionMode(InlongConstants.RUNTIME_EXECUTION_MODE_STREAMING);
}
FlinkOperation flinkOperation = FlinkOperation.getInstance();
try {
flinkOperation.genPath(flinkInfo, dataflow);
- // only start job for real-time mode
- if (isRealTimeSync) {
- flinkOperation.start(flinkInfo);
- log.info("job submit success for groupId = {}, streamId = {}, jobId = {}", groupId,
- streamInfo.getInlongStreamId(), flinkInfo.getJobId());
- }
+ flinkOperation.start(flinkInfo);
+ log.info("job submit success for groupId = {}, mode = {}, streamId = {}, jobId = {}",
+ groupId, groupResourceForm.getGroupInfo().getInlongGroupMode(),
+ streamInfo.getInlongStreamId(), flinkInfo.getJobId());
} catch (Exception e) {
flinkInfo.setException(true);
flinkInfo.setExceptionMsg(getExceptionStackMsg(e));
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/StreamParseUtils.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/StreamParseUtils.java
index 92470968ac..203bcc6778 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/StreamParseUtils.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/StreamParseUtils.java
@@ -17,11 +17,14 @@
package org.apache.inlong.manager.pojo.sort.util;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.TransformType;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.pojo.sink.StreamSink;
import org.apache.inlong.manager.pojo.source.StreamSource;
+import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.pojo.stream.StreamNode;
import org.apache.inlong.manager.pojo.stream.StreamPipeline;
import org.apache.inlong.manager.pojo.stream.StreamTransform;
@@ -38,6 +41,8 @@ import org.apache.inlong.manager.pojo.transform.splitter.SplitterDefinition;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
/**
* Utils of stream parse.
@@ -154,4 +159,20 @@ public class StreamParseUtils {
return GSON.fromJson(tempView, StreamPipeline.class);
}
+ public static String getStreamExtProperty(String key, InlongStreamInfo streamInfo) {
+ if (StringUtils.isNotBlank(key) && streamInfo != null && CollectionUtils.isNotEmpty(streamInfo.getExtList())) {
+ for (InlongStreamExtInfo ext : streamInfo.getExtList()) {
+ if (key.equalsIgnoreCase(ext.getKeyName())) {
+ return ext.getKeyValue();
+ }
+ }
+ }
+ return null;
+ }
+
+ public static boolean isRegisterScheduleSuccess(InlongStreamInfo streamInfo) {
+ return InlongConstants.REGISTERED
+ .equalsIgnoreCase(getStreamExtProperty(InlongConstants.REGISTER_SCHEDULE_STATUS, streamInfo));
+ }
+
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/StreamTaskListenerFactory.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/StreamTaskListenerFactory.java
index 43fdfe4666..87c3a867c2 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/StreamTaskListenerFactory.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/StreamTaskListenerFactory.java
@@ -20,12 +20,14 @@ package org.apache.inlong.manager.service.listener;
import org.apache.inlong.manager.common.plugin.Plugin;
import org.apache.inlong.manager.common.plugin.PluginBinder;
import org.apache.inlong.manager.service.listener.queue.StreamQueueResourceListener;
+import org.apache.inlong.manager.service.listener.schedule.StreamScheduleResourceListener;
import org.apache.inlong.manager.service.listener.sink.StreamSinkResourceListener;
import org.apache.inlong.manager.service.listener.sort.StreamSortConfigListener;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.definition.ServiceTaskType;
import org.apache.inlong.manager.workflow.definition.TaskListenerFactory;
import org.apache.inlong.manager.workflow.event.task.QueueOperateListener;
+import org.apache.inlong.manager.workflow.event.task.ScheduleOperateListener;
import org.apache.inlong.manager.workflow.event.task.SinkOperateListener;
import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
import org.apache.inlong.manager.workflow.event.task.SourceOperateListener;
@@ -53,6 +55,7 @@ public class StreamTaskListenerFactory implements PluginBinder, TaskListenerFact
private List<QueueOperateListener> queueOperateListeners;
private List<SortOperateListener> sortOperateListeners;
private List<SinkOperateListener> sinkOperateListeners;
+ private List<ScheduleOperateListener> scheduleOperateListeners;
@Autowired
private StreamQueueResourceListener queueResourceListener;
@@ -60,6 +63,8 @@ public class StreamTaskListenerFactory implements PluginBinder, TaskListenerFact
private StreamSortConfigListener streamSortConfigListener;
@Autowired
private StreamSinkResourceListener sinkResourceListener;
+ @Autowired
+ private StreamScheduleResourceListener scheduleResourceListener;
@PostConstruct
public void init() {
@@ -70,6 +75,8 @@ public class StreamTaskListenerFactory implements PluginBinder, TaskListenerFact
sortOperateListeners.add(streamSortConfigListener);
sinkOperateListeners = new LinkedList<>();
sinkOperateListeners.add(sinkResourceListener);
+ scheduleOperateListeners = new LinkedList<>();
+ scheduleOperateListeners.add(scheduleResourceListener);
}
@Override
@@ -94,6 +101,10 @@ public class StreamTaskListenerFactory implements PluginBinder, TaskListenerFact
if (CollectionUtils.isNotEmpty(pluginSinkOperateListeners)) {
sinkOperateListeners.addAll(pluginSinkOperateListeners);
}
+ List<ScheduleOperateListener> pluginScheduleOperateListeners = processPlugin.createScheduleOperateListeners();
+ if (CollectionUtils.isNotEmpty(pluginScheduleOperateListeners)) {
+ scheduleOperateListeners.addAll(pluginScheduleOperateListeners);
+ }
}
@Override
@@ -118,6 +129,9 @@ public class StreamTaskListenerFactory implements PluginBinder, TaskListenerFact
case INIT_SINK:
List<SinkOperateListener> sinkOperateListeners = getSinkOperateListener(workflowContext);
return Lists.newArrayList(sinkOperateListeners);
+ case INIT_SCHEDULE:
+ List<ScheduleOperateListener> scheduleOperateListeners = getScheduleOperateListener(workflowContext);
+ return Lists.newArrayList(scheduleOperateListeners);
default:
throw new IllegalArgumentException(String.format("Unsupported ServiceTaskType %s", taskType));
}
@@ -131,6 +145,7 @@ public class StreamTaskListenerFactory implements PluginBinder, TaskListenerFact
queueOperateListeners = new LinkedList<>();
sortOperateListeners = new LinkedList<>();
sinkOperateListeners = new LinkedList<>();
+ scheduleOperateListeners = new LinkedList<>();
}
/**
@@ -185,4 +200,17 @@ public class StreamTaskListenerFactory implements PluginBinder, TaskListenerFact
return listeners;
}
+ /**
+ * Get schedule operate listener list.
+ */
+ private List<ScheduleOperateListener> getScheduleOperateListener(WorkflowContext context) {
+ List<ScheduleOperateListener> listeners = new ArrayList<>();
+ for (ScheduleOperateListener listener : scheduleOperateListeners) {
+ if (listener != null && listener.accept(context)) {
+ listeners.add(listener);
+ }
+ }
+ return listeners;
+ }
+
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/schedule/StreamScheduleResourceListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/schedule/StreamScheduleResourceListener.java
new file mode 100644
index 0000000000..61179dc343
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/schedule/StreamScheduleResourceListener.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.listener.schedule;
+
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.enums.GroupOperateType;
+import org.apache.inlong.manager.common.enums.TaskEvent;
+import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
+import org.apache.inlong.manager.pojo.workflow.form.process.StreamResourceProcessForm;
+import org.apache.inlong.manager.workflow.WorkflowContext;
+import org.apache.inlong.manager.workflow.event.ListenerResult;
+import org.apache.inlong.manager.workflow.event.task.ScheduleOperateListener;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Service;
+
+import java.util.List;
+
+@Service
+@Slf4j
+public class StreamScheduleResourceListener implements ScheduleOperateListener {
+
+ @Override
+ public TaskEvent event() {
+ return TaskEvent.COMPLETE;
+ }
+
+ @Override
+ public boolean accept(WorkflowContext context) {
+ ProcessForm processForm = context.getProcessForm();
+ String groupId = processForm.getInlongGroupId();
+ if (!(processForm instanceof StreamResourceProcessForm)) {
+ log.info("not add schedule stream listener, not StreamResourceProcessForm for groupId [{}]", groupId);
+ return false;
+ }
+
+ StreamResourceProcessForm streamProcessForm = (StreamResourceProcessForm) processForm;
+ String streamId = streamProcessForm.getStreamInfo().getInlongStreamId();
+ if (streamProcessForm.getGroupOperateType() != GroupOperateType.INIT) {
+ log.info("not add schedule stream listener, as the operate was not INIT for groupId [{}] streamId [{}]",
+ groupId, streamId);
+ return false;
+ }
+
+ log.info("add startup stream listener for groupId [{}] streamId [{}]", groupId, streamId);
+ return InlongConstants.DATASYNC_OFFLINE_MODE.equals(streamProcessForm.getGroupInfo().getInlongGroupMode());
+ }
+
+ @Override
+ public ListenerResult listen(WorkflowContext context) throws Exception {
+ StreamResourceProcessForm form = (StreamResourceProcessForm) context.getProcessForm();
+ InlongStreamInfo streamInfo = form.getStreamInfo();
+ final String groupId = streamInfo.getInlongGroupId();
+ final String streamId = streamInfo.getInlongStreamId();
+ log.info("begin to register schedule info for groupId={}, streamId={}", groupId, streamId);
+
+ // todo: register schedule info to schedule service
+
+ // after register schedule info successfully, add ext property to stream info
+ saveInfo(streamInfo, InlongConstants.REGISTER_SCHEDULE_STATUS,
+ InlongConstants.REGISTERED, streamInfo.getExtList());
+ log.info("success to register schedule info for group [" + groupId + "] and stream [" + streamId + "]");
+ return ListenerResult.success();
+ }
+
+ /**
+ * Save stream ext info into list.
+ */
+ private void saveInfo(InlongStreamInfo streamInfo, String keyName, String keyValue,
+ List<InlongStreamExtInfo> extInfoList) {
+ InlongStreamExtInfo extInfo = new InlongStreamExtInfo();
+ extInfo.setInlongGroupId(streamInfo.getInlongGroupId());
+ extInfo.setInlongStreamId(streamInfo.getInlongStreamId());
+ extInfo.setKeyName(keyName);
+ extInfo.setKeyValue(keyValue);
+ extInfoList.add(extInfo);
+ }
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java
index ebed9e72e3..f8981bf09a 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java
@@ -24,6 +24,7 @@ import org.apache.inlong.manager.common.enums.TaskEvent;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.sink.StreamSink;
+import org.apache.inlong.manager.pojo.sort.util.StreamParseUtils;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
@@ -125,6 +126,13 @@ public class SortConfigListener implements SortOperateListener {
try {
for (InlongStreamInfo streamInfo : streamInfos) {
+ // do not build sort config if the group mode is offline and the stream is not config successfully
+ if (InlongConstants.DATASYNC_OFFLINE_MODE.equals(groupInfo.getInlongGroupMode())
+ && !StreamParseUtils.isRegisterScheduleSuccess(streamInfo)) {
+ LOGGER.info("no need to build sort config for groupId={} streamId={} as the mode is offline "
+ + "and the stream is not config successfully yet", groupId, streamInfo.getInlongStreamId());
+ continue;
+ }
List<StreamSink> sinkList = streamInfo.getSinkList();
if (CollectionUtils.isEmpty(sinkList)) {
continue;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java
index d3cf199a21..5505ed0694 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java
@@ -102,17 +102,26 @@ public class CreateStreamWorkflowDefinition implements WorkflowDefinition {
initSourceTask.setListenerFactory(streamTaskListenerFactory);
process.addTask(initSourceTask);
+ // Init Schedule info
+ ServiceTask initScheduleTask = new ServiceTask();
+ initScheduleTask.setName("InitSchedule");
+ initScheduleTask.setDisplayName("Stream-InitSchedule");
+ initScheduleTask.setServiceTaskType(ServiceTaskType.INIT_SCHEDULE);
+ initScheduleTask.setListenerFactory(streamTaskListenerFactory);
+ process.addTask(initScheduleTask);
+
// End node
EndEvent endEvent = new EndEvent();
process.setEndEvent(endEvent);
- // Task dependency order: 1.MQ -> 2.Sink -> 3.Sort -> 4.Source
+ // Task dependency order: 1.MQ -> 2.Sink -> 3.Sort -> 4.Source -> 5.Schedule
// To ensure that after some tasks fail, data will not start to be collected by source or consumed by sort
startEvent.addNext(initMQTask);
initMQTask.addNext(initSinkTask);
initSinkTask.addNext(initSortTask);
initSortTask.addNext(initSourceTask);
- initSourceTask.addNext(endEvent);
+ initSourceTask.addNext(initScheduleTask);
+ initScheduleTask.addNext(endEvent);
return process;
}
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/ServiceTaskType.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/ServiceTaskType.java
index 2686b4b60a..156863a676 100644
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/ServiceTaskType.java
+++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/ServiceTaskType.java
@@ -25,6 +25,7 @@ public enum ServiceTaskType {
INIT_MQ,
INIT_SORT,
INIT_SINK,
+ INIT_SCHEDULE,
STOP_SOURCE,
STOP_SORT,
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/ServiceTaskType.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/task/ScheduleOperateListener.java
similarity index 55%
copy from inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/ServiceTaskType.java
copy to inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/task/ScheduleOperateListener.java
index 2686b4b60a..c3e162cd26 100644
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/ServiceTaskType.java
+++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/task/ScheduleOperateListener.java
@@ -15,30 +15,24 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.workflow.definition;
+package org.apache.inlong.manager.workflow.event.task;
-import java.util.Locale;
+import org.apache.inlong.manager.common.enums.TaskEvent;
+import org.apache.inlong.manager.workflow.WorkflowContext;
+import org.apache.inlong.manager.workflow.event.ListenerResult;
-public enum ServiceTaskType {
+public interface ScheduleOperateListener extends TaskEventListener {
- INIT_SOURCE,
- INIT_MQ,
- INIT_SORT,
- INIT_SINK,
+ ScheduleOperateListener DEFAULT_SCHEDULE_OPERATE_LISTENER = new ScheduleOperateListener() {
- STOP_SOURCE,
- STOP_SORT,
-
- RESTART_SOURCE,
- RESTART_SORT,
-
- DELETE_SOURCE,
- DELETE_MQ,
- DELETE_SORT;
-
- @Override
- public String toString() {
- return name().toLowerCase(Locale.ROOT);
- }
+ @Override
+ public TaskEvent event() {
+ return TaskEvent.COMPLETE;
+ }
+ @Override
+ public ListenerResult listen(WorkflowContext context) {
+ return ListenerResult.success();
+ }
+ };
}
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/ProcessPlugin.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/ProcessPlugin.java
index 7e15d52761..452fa2bfb8 100644
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/ProcessPlugin.java
+++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/ProcessPlugin.java
@@ -19,6 +19,7 @@ package org.apache.inlong.manager.workflow.plugin;
import org.apache.inlong.manager.common.plugin.Plugin;
import org.apache.inlong.manager.workflow.event.task.QueueOperateListener;
+import org.apache.inlong.manager.workflow.event.task.ScheduleOperateListener;
import org.apache.inlong.manager.workflow.event.task.SinkOperateListener;
import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
import org.apache.inlong.manager.workflow.event.task.SourceOperateListener;
@@ -38,6 +39,10 @@ public interface ProcessPlugin extends Plugin {
return null;
}
+ default List<ScheduleOperateListener> createScheduleOperateListeners() {
+ return null;
+ }
+
default List<QueueOperateListener> createQueueOperateListeners() {
return null;
}