You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/07/27 10:48:01 UTC

[inlong] branch master updated: [INLONG-5212][Manager] Merge the Selector classes into their related Listeners (#5220)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new cec55445a [INLONG-5212][Manager] Merge the Selector classes into their related Listeners (#5220)
cec55445a is described below

commit cec55445a1c97027a550706c36e68ff6c636ffd4
Author: healchow <he...@gmail.com>
AuthorDate: Wed Jul 27 18:47:57 2022 +0800

    [INLONG-5212][Manager] Merge the Selector classes into their related Listeners (#5220)
---
 .../manager/plugin/FlinkSortProcessPlugin.java     |  39 ++---
 .../plugin/eventselect/DeleteProcessSelector.java  |  55 -------
 .../plugin/eventselect/DeleteStreamSelector.java   |  57 --------
 .../plugin/eventselect/RestartProcessSelector.java |  55 -------
 .../plugin/eventselect/RestartStreamSelector.java  |  57 --------
 .../plugin/eventselect/StartupProcessSelector.java |  54 -------
 .../plugin/eventselect/StartupStreamSelector.java  |  55 -------
 .../plugin/eventselect/SuspendProcessSelector.java |  55 -------
 .../plugin/eventselect/SuspendStreamSelector.java  |  55 -------
 .../plugin/listener/DeleteSortListener.java        |  22 ++-
 .../plugin/listener/DeleteStreamListener.java      |  26 +++-
 .../plugin/listener/RestartSortListener.java       |  20 +++
 .../plugin/listener/RestartStreamListener.java     |  22 +++
 .../plugin/listener/StartupSortListener.java       |  19 +++
 .../plugin/listener/StartupStreamListener.java     |  24 +++-
 .../plugin/listener/SuspendSortListener.java       |  22 ++-
 .../plugin/listener/SuspendStreamListener.java     |  22 +++
 .../plugin/listener/StartupSortListenerTest.java   |   8 +-
 .../plugin/listener/SuspendSortListenerTest.java   |   6 +-
 .../service/mq/CreatePulsarGroupTaskListener.java  |  36 ++++-
 .../mq/CreatePulsarResourceTaskListener.java       |  35 ++++-
 .../mq/CreatePulsarSubscriptionTaskListener.java   |  37 +++++
 .../service/mq/CreatePulsarTopicTaskListener.java  |  35 +++++
 .../service/mq/CreateTubeGroupTaskListener.java    |  20 +++
 .../service/mq/CreateTubeTopicTaskListener.java    |  20 +++
 .../mq/DeletePulsarResourceTaskListener.java       |  34 +++++
 .../service/mq/DeletePulsarTopicTaskListener.java  |  35 +++++
 .../service/mq/PulsarResourceCreateSelector.java   |  67 ---------
 .../service/mq/PulsarResourceDeleteSelector.java   |  66 ---------
 .../service/mq/PulsarTopicCreateSelector.java      |  63 --------
 .../service/mq/PulsarTopicDeleteSelector.java      |  62 --------
 .../manager/service/mq/TubeEventSelector.java      |  50 -------
 .../resource/StreamSinkResourceListener.java       |   7 +
 .../manager/service/sort/SortConfigListener.java   |  31 ++++
 .../service/sort/StreamSortConfigListener.java     |  21 +++
 .../service/sort/ZookeeperDisabledSelector.java    |  63 --------
 .../service/sort/ZookeeperEnabledSelector.java     |  53 -------
 .../listener/AbstractSourceOperateListener.java    |   6 +-
 .../source/listener/SourceDeleteEventSelector.java |  40 ------
 .../source/listener/SourceDeleteListener.java      |  14 ++
 .../listener/SourceRestartEventSelector.java       |  40 ------
 .../source/listener/SourceRestartListener.java     |  14 ++
 .../source/listener/SourceStopEventSelector.java   |  40 ------
 .../source/listener/SourceStopListener.java        |  14 ++
 .../listener/ConsumptionPassTaskListener.java      |   2 +-
 .../group/CreateGroupWorkflowDefinition.java       |  18 +--
 .../group/DeleteGroupWorkflowDefinition.java       |  12 +-
 .../group/RestartGroupWorkflowDefinition.java      |   8 +-
 .../group/SuspendGroupWorkflowDefinition.java      |   8 +-
 .../group/listener/InitGroupCompleteListener.java  |  10 +-
 .../listener/GroupTaskListenerFactory.java         | 144 +++++++++----------
 .../listener/StreamTaskListenerFactory.java        | 159 ++++++++++-----------
 .../stream/CreateStreamWorkflowDefinition.java     |  16 +--
 .../stream/DeleteStreamWorkflowDefinition.java     |  12 +-
 .../stream/RestartStreamWorkflowDefinition.java    |   8 +-
 .../stream/SuspendStreamWorkflowDefinition.java    |   8 +-
 .../service/core/plugin/PluginClassLoaderTest.java |  10 +-
 .../service/mocks/MockDeleteSortListener.java      |  13 ++
 .../service/mocks/MockDeleteSourceListener.java    |  17 ++-
 .../inlong/manager/service/mocks/MockPlugin.java   |  67 ++-------
 .../service/mocks/MockRestartSortListener.java     |  13 ++
 .../service/mocks/MockRestartSourceListener.java   |  17 ++-
 .../service/mocks/MockStopSortListener.java        |  13 ++
 .../service/mocks/MockStopSourceListener.java      |  17 ++-
 .../workflow/GroupTaskListenerFactoryTest.java     |   2 +-
 .../service/workflow/WorkflowServiceImplTest.java  |  58 ++++----
 .../group/CreateGroupWorkflowDefinitionTest.java   |   2 +-
 .../manager-service/src/test/resources/log4j2.xml  |  46 ++++++
 .../manager/workflow/definition/ServiceTask.java   |  42 +++---
 ...tenerProvider.java => TaskListenerFactory.java} |  15 +-
 .../manager/workflow/definition/WorkflowTask.java  |   2 +-
 .../manager/workflow/event/EventSelector.java      |  31 ----
 .../workflow/event/task/QueueOperateListener.java  |   3 +-
 .../workflow/event/task/SinkOperateListener.java   |   3 +-
 .../workflow/event/task/SortOperateListener.java   |   3 +-
 ...ateListener.java => SourceOperateListener.java} |   7 +-
 .../workflow/event/task/TaskEventListener.java     |  16 ++-
 .../manager/workflow/plugin/ProcessPlugin.java     |  15 +-
 78 files changed, 982 insertions(+), 1411 deletions(-)

diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/FlinkSortProcessPlugin.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/FlinkSortProcessPlugin.java
index 45cf2b1e2..896b770d4 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/FlinkSortProcessPlugin.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/FlinkSortProcessPlugin.java
@@ -18,14 +18,6 @@
 package org.apache.inlong.manager.plugin;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.plugin.eventselect.DeleteProcessSelector;
-import org.apache.inlong.manager.plugin.eventselect.DeleteStreamSelector;
-import org.apache.inlong.manager.plugin.eventselect.RestartProcessSelector;
-import org.apache.inlong.manager.plugin.eventselect.RestartStreamSelector;
-import org.apache.inlong.manager.plugin.eventselect.StartupProcessSelector;
-import org.apache.inlong.manager.plugin.eventselect.StartupStreamSelector;
-import org.apache.inlong.manager.plugin.eventselect.SuspendProcessSelector;
-import org.apache.inlong.manager.plugin.eventselect.SuspendStreamSelector;
 import org.apache.inlong.manager.plugin.listener.DeleteSortListener;
 import org.apache.inlong.manager.plugin.listener.DeleteStreamListener;
 import org.apache.inlong.manager.plugin.listener.RestartSortListener;
@@ -34,13 +26,12 @@ import org.apache.inlong.manager.plugin.listener.StartupSortListener;
 import org.apache.inlong.manager.plugin.listener.StartupStreamListener;
 import org.apache.inlong.manager.plugin.listener.SuspendSortListener;
 import org.apache.inlong.manager.plugin.listener.SuspendStreamListener;
-import org.apache.inlong.manager.workflow.event.EventSelector;
-import org.apache.inlong.manager.workflow.event.task.DataSourceOperateListener;
 import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
+import org.apache.inlong.manager.workflow.event.task.SourceOperateListener;
 import org.apache.inlong.manager.workflow.plugin.ProcessPlugin;
 
-import java.util.LinkedHashMap;
-import java.util.Map;
+import java.util.LinkedList;
+import java.util.List;
 
 /**
  * Plugin of flink sort process.
@@ -49,21 +40,21 @@ import java.util.Map;
 public class FlinkSortProcessPlugin implements ProcessPlugin {
 
     @Override
-    public Map<DataSourceOperateListener, EventSelector> createSourceOperateListeners() {
-        return new LinkedHashMap<>();
+    public List<SourceOperateListener> createSourceOperateListeners() {
+        return new LinkedList<>();
     }
 
     @Override
-    public Map<SortOperateListener, EventSelector> createSortOperateListeners() {
-        Map<SortOperateListener, EventSelector> listeners = new LinkedHashMap<>();
-        listeners.put(new DeleteSortListener(), new DeleteProcessSelector());
-        listeners.put(new RestartSortListener(), new RestartProcessSelector());
-        listeners.put(new SuspendSortListener(), new SuspendProcessSelector());
-        listeners.put(new StartupSortListener(), new StartupProcessSelector());
-        listeners.put(new DeleteStreamListener(), new DeleteStreamSelector());
-        listeners.put(new RestartStreamListener(), new RestartStreamSelector());
-        listeners.put(new SuspendStreamListener(), new SuspendStreamSelector());
-        listeners.put(new StartupStreamListener(), new StartupStreamSelector());
+    public List<SortOperateListener> createSortOperateListeners() {
+        List<SortOperateListener> listeners = new LinkedList<>();
+        listeners.add(new DeleteSortListener());
+        listeners.add(new RestartSortListener());
+        listeners.add(new SuspendSortListener());
+        listeners.add(new StartupSortListener());
+        listeners.add(new DeleteStreamListener());
+        listeners.add(new RestartStreamListener());
+        listeners.add(new SuspendStreamListener());
+        listeners.add(new StartupStreamListener());
         return listeners;
     }
 }
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/DeleteProcessSelector.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/DeleteProcessSelector.java
deleted file mode 100644
index d2b9a3dd8..000000000
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/DeleteProcessSelector.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.plugin.eventselect;
-
-import lombok.SneakyThrows;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.enums.GroupOperateType;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
-import org.apache.inlong.manager.workflow.WorkflowContext;
-import org.apache.inlong.manager.workflow.event.EventSelector;
-
-/**
- * Selector of delete process event.
- */
-@Slf4j
-public class DeleteProcessSelector implements EventSelector {
-
-    @SneakyThrows
-    @Override
-    public boolean accept(WorkflowContext context) {
-        ProcessForm processForm = context.getProcessForm();
-        String groupId = processForm.getInlongGroupId();
-        if (!(processForm instanceof GroupResourceProcessForm)) {
-            log.info("not add deleteProcess listener, as the form was not GroupResourceProcessForm for groupId [{}]",
-                    groupId);
-            return false;
-        }
-
-        GroupResourceProcessForm groupResourceProcessForm = (GroupResourceProcessForm) processForm;
-        boolean flag = groupResourceProcessForm.getGroupOperateType() == GroupOperateType.DELETE;
-        if (!flag) {
-            log.info("not add deleteProcess listener, as the operate was not DELETE for groupId [{}]", groupId);
-            return false;
-        }
-
-        log.info("add deleteProcess listener for groupId [{}]", groupId);
-        return true;
-    }
-}
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/DeleteStreamSelector.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/DeleteStreamSelector.java
deleted file mode 100644
index 35e1543d0..000000000
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/DeleteStreamSelector.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.plugin.eventselect;
-
-import lombok.SneakyThrows;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.enums.GroupOperateType;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.StreamResourceProcessForm;
-import org.apache.inlong.manager.workflow.WorkflowContext;
-import org.apache.inlong.manager.workflow.event.EventSelector;
-
-/**
- * Selector of delete stream event.
- */
-@Slf4j
-public class DeleteStreamSelector implements EventSelector {
-
-    @SneakyThrows
-    @Override
-    public boolean accept(WorkflowContext context) {
-        ProcessForm processForm = context.getProcessForm();
-        String groupId = processForm.getInlongGroupId();
-        if (!(processForm instanceof StreamResourceProcessForm)) {
-            log.info("not add deleteStream listener, as the form was not StreamResourceProcessForm for groupId [{}]",
-                    groupId);
-            return false;
-        }
-
-        StreamResourceProcessForm streamResourceProcessForm = (StreamResourceProcessForm) processForm;
-        String streamId = streamResourceProcessForm.getStreamInfo().getInlongStreamId();
-        boolean flag = streamResourceProcessForm.getGroupOperateType() == GroupOperateType.DELETE;
-        if (!flag) {
-            log.info("not add deleteStream listener, as the operate was not DELETE for groupId [{}] and streamId [{}]",
-                    groupId, streamId);
-            return false;
-        }
-
-        log.info("add deleteStream listener for groupId [{}] and streamId [{}]", groupId, streamId);
-        return true;
-    }
-}
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/RestartProcessSelector.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/RestartProcessSelector.java
deleted file mode 100644
index fa70bb88c..000000000
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/RestartProcessSelector.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.plugin.eventselect;
-
-import lombok.SneakyThrows;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.enums.GroupOperateType;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
-import org.apache.inlong.manager.workflow.WorkflowContext;
-import org.apache.inlong.manager.workflow.event.EventSelector;
-
-/**
- * Selector of restart process event.
- */
-@Slf4j
-public class RestartProcessSelector implements EventSelector {
-
-    @SneakyThrows
-    @Override
-    public boolean accept(WorkflowContext workflowContext) {
-        ProcessForm processForm = workflowContext.getProcessForm();
-        String groupId = processForm.getInlongGroupId();
-        if (!(processForm instanceof GroupResourceProcessForm)) {
-            log.info("not add restartProcess listener, as the form was not GroupResourceProcessForm for groupId [{}]",
-                    groupId);
-            return false;
-        }
-
-        GroupResourceProcessForm groupProcessForm = (GroupResourceProcessForm) processForm;
-        boolean flag = groupProcessForm.getGroupOperateType() == GroupOperateType.RESTART;
-        if (!flag) {
-            log.info("\"not add restartProcess listener, as the operate was not RESTART for groupId [{}]", groupId);
-            return false;
-        }
-
-        log.info("add restartProcess listener for groupId [{}]", groupId);
-        return true;
-    }
-}
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/RestartStreamSelector.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/RestartStreamSelector.java
deleted file mode 100644
index 7492dcd46..000000000
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/RestartStreamSelector.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.plugin.eventselect;
-
-import lombok.SneakyThrows;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.enums.GroupOperateType;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.StreamResourceProcessForm;
-import org.apache.inlong.manager.workflow.WorkflowContext;
-import org.apache.inlong.manager.workflow.event.EventSelector;
-
-/**
- * Selector of restart stream event.
- */
-@Slf4j
-public class RestartStreamSelector implements EventSelector {
-
-    @SneakyThrows
-    @Override
-    public boolean accept(WorkflowContext workflowContext) {
-        ProcessForm processForm = workflowContext.getProcessForm();
-        String groupId = processForm.getInlongGroupId();
-        if (!(processForm instanceof StreamResourceProcessForm)) {
-            log.info("not add restartStream listener, as the form was not StreamResourceProcessForm for groupId [{}]",
-                    groupId);
-            return false;
-        }
-
-        StreamResourceProcessForm streamProcessForm = (StreamResourceProcessForm) processForm;
-        String streamId = streamProcessForm.getStreamInfo().getInlongStreamId();
-        boolean flag = streamProcessForm.getGroupOperateType() == GroupOperateType.RESTART;
-        if (!flag) {
-            log.info("not add restartStream listener, as the operate was not RESTART for groupId [{}] streamId [{}]",
-                    groupId, streamId);
-            return false;
-        }
-
-        log.info("add restartStream listener for groupId [{}] and streamId [{}]", groupId, streamId);
-        return true;
-    }
-}
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/StartupProcessSelector.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/StartupProcessSelector.java
deleted file mode 100644
index 2c4a93bb8..000000000
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/StartupProcessSelector.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.plugin.eventselect;
-
-import lombok.SneakyThrows;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.enums.GroupOperateType;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
-import org.apache.inlong.manager.workflow.WorkflowContext;
-import org.apache.inlong.manager.workflow.event.EventSelector;
-
-/**
- * Selector of startup process event.
- */
-@Slf4j
-public class StartupProcessSelector implements EventSelector {
-
-    @SneakyThrows
-    @Override
-    public boolean accept(WorkflowContext workflowContext) {
-        ProcessForm processForm = workflowContext.getProcessForm();
-        String groupId = processForm.getInlongGroupId();
-        if (!(processForm instanceof GroupResourceProcessForm)) {
-            log.info("not add startupProcess listener, as the form was not GroupResourceProcessForm for groupId [{}]",
-                    groupId);
-            return false;
-        }
-        GroupResourceProcessForm groupProcessForm = (GroupResourceProcessForm) processForm;
-        boolean flag = groupProcessForm.getGroupOperateType() == GroupOperateType.INIT;
-        if (!flag) {
-            log.info("not add startupProcess listener, as the operate was not INIT for groupId [{}]", groupId);
-            return false;
-        }
-
-        log.info("add startupProcess listener for groupId [{}]", groupId);
-        return true;
-    }
-}
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/StartupStreamSelector.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/StartupStreamSelector.java
deleted file mode 100644
index 20dc1c9c9..000000000
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/StartupStreamSelector.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.plugin.eventselect;
-
-import lombok.SneakyThrows;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.enums.GroupOperateType;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.StreamResourceProcessForm;
-import org.apache.inlong.manager.workflow.WorkflowContext;
-import org.apache.inlong.manager.workflow.event.EventSelector;
-
-/**
- * Selector of startup stream event.
- */
-@Slf4j
-public class StartupStreamSelector implements EventSelector {
-
-    @SneakyThrows
-    @Override
-    public boolean accept(WorkflowContext workflowContext) {
-        ProcessForm processForm = workflowContext.getProcessForm();
-        String groupId = processForm.getInlongGroupId();
-        if (!(processForm instanceof StreamResourceProcessForm)) {
-            log.info("not add startupStream listener, as the form was not StreamResourceProcessForm for groupId [{}]",
-                    groupId);
-            return false;
-        }
-        StreamResourceProcessForm streamProcessForm = (StreamResourceProcessForm) processForm;
-        boolean flag = streamProcessForm.getGroupOperateType() == GroupOperateType.INIT;
-        String streamId = streamProcessForm.getStreamInfo().getInlongStreamId();
-        if (!flag) {
-            log.info("not add startupStream listener, as the operate was not INIT for groupId [{}] and streamId [{}]",
-                    groupId, streamId);
-            return false;
-        }
-        log.info("add startupStream listener for groupId [{}] and streamId [{}]", groupId, streamId);
-        return true;
-    }
-}
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/SuspendProcessSelector.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/SuspendProcessSelector.java
deleted file mode 100644
index 85b33096d..000000000
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/SuspendProcessSelector.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.plugin.eventselect;
-
-import lombok.SneakyThrows;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.enums.GroupOperateType;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
-import org.apache.inlong.manager.workflow.WorkflowContext;
-import org.apache.inlong.manager.workflow.event.EventSelector;
-
-/**
- * Selector of suspend process event.
- */
-@Slf4j
-public class SuspendProcessSelector implements EventSelector {
-
-    @SneakyThrows
-    @Override
-    public boolean accept(WorkflowContext workflowContext) {
-        ProcessForm processForm = workflowContext.getProcessForm();
-        String groupId = processForm.getInlongGroupId();
-        if (!(processForm instanceof GroupResourceProcessForm)) {
-            log.info("not add suspendProcess listener, as the form was not GroupResourceProcessForm for groupId [{}]",
-                    groupId);
-            return false;
-        }
-
-        GroupResourceProcessForm groupResourceProcessForm = (GroupResourceProcessForm) processForm;
-        boolean flag = groupResourceProcessForm.getGroupOperateType() == GroupOperateType.SUSPEND;
-        if (!flag) {
-            log.info("not add suspendProcess listener, as the operate was not SUSPEND for groupId [{}]", groupId);
-            return false;
-        }
-
-        log.info("add suspendProcess listener for groupId [{}]", groupId);
-        return true;
-    }
-}
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/SuspendStreamSelector.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/SuspendStreamSelector.java
deleted file mode 100644
index 126f507ee..000000000
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/SuspendStreamSelector.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.plugin.eventselect;
-
-import lombok.SneakyThrows;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.enums.GroupOperateType;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.StreamResourceProcessForm;
-import org.apache.inlong.manager.workflow.WorkflowContext;
-import org.apache.inlong.manager.workflow.event.EventSelector;
-
-/**
- * Selector of startup stream event.
- */
-@Slf4j
-public class SuspendStreamSelector implements EventSelector {
-
-    @SneakyThrows
-    @Override
-    public boolean accept(WorkflowContext workflowContext) {
-        ProcessForm processForm = workflowContext.getProcessForm();
-        String groupId = processForm.getInlongGroupId();
-        if (!(processForm instanceof StreamResourceProcessForm)) {
-            log.info("not add suspendStream listener as StreamResourceProcessForm for groupId [{}]", groupId);
-            return false;
-        }
-        StreamResourceProcessForm streamProcessForm = (StreamResourceProcessForm) processForm;
-        String streamId = streamProcessForm.getStreamInfo().getInlongStreamId();
-        boolean flag = streamProcessForm.getGroupOperateType() == GroupOperateType.SUSPEND;
-        if (!flag) {
-            log.info("not add suspendStream listener as the operate SUSPEND for groupId [{}] and streamId [{}]",
-                    groupId, streamId);
-            return false;
-        }
-
-        log.info("add suspendStream listener for groupId [{}] and streamId [{}]", groupId, streamId);
-        return true;
-    }
-}
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
index c28ddd349..60569c3ca 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
@@ -21,11 +21,12 @@ import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.enums.GroupOperateType;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
 import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
-import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.plugin.flink.FlinkOperation;
 import org.apache.inlong.manager.plugin.flink.FlinkService;
 import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
@@ -53,6 +54,25 @@ public class DeleteSortListener implements SortOperateListener {
         return TaskEvent.COMPLETE;
     }
 
+    @Override
+    public boolean accept(WorkflowContext context) {
+        ProcessForm processForm = context.getProcessForm();
+        String groupId = processForm.getInlongGroupId();
+        if (!(processForm instanceof GroupResourceProcessForm)) {
+            log.info("not add delete group listener, not GroupResourceProcessForm for groupId [{}]", groupId);
+            return false;
+        }
+
+        GroupResourceProcessForm groupProcessForm = (GroupResourceProcessForm) processForm;
+        if (groupProcessForm.getGroupOperateType() != GroupOperateType.DELETE) {
+            log.info("not add delete group listener, as the operate was not DELETE for groupId [{}]", groupId);
+            return false;
+        }
+
+        log.info("add delete group listener for groupId [{}]", groupId);
+        return true;
+    }
+
     @Override
     public ListenerResult listen(WorkflowContext context) throws Exception {
         ProcessForm processForm = context.getProcessForm();
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java
index 188d3bbd7..49350a720 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java
@@ -21,13 +21,14 @@ import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.enums.GroupOperateType;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamExtInfo;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
 import org.apache.inlong.manager.common.pojo.workflow.form.process.StreamResourceProcessForm;
-import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.plugin.flink.FlinkOperation;
 import org.apache.inlong.manager.plugin.flink.FlinkService;
 import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
@@ -55,6 +56,27 @@ public class DeleteStreamListener implements SortOperateListener {
         return TaskEvent.COMPLETE;
     }
 
+    @Override
+    public boolean accept(WorkflowContext context) {
+        ProcessForm processForm = context.getProcessForm();
+        String groupId = processForm.getInlongGroupId();
+        if (!(processForm instanceof StreamResourceProcessForm)) {
+            log.info("not add delete stream listener, not StreamResourceProcessForm for groupId [{}]", groupId);
+            return false;
+        }
+
+        StreamResourceProcessForm streamProcessForm = (StreamResourceProcessForm) processForm;
+        String streamId = streamProcessForm.getStreamInfo().getInlongStreamId();
+        if (streamProcessForm.getGroupOperateType() != GroupOperateType.DELETE) {
+            log.info("not add delete stream listener, as the operate was not DELETE for groupId [{}] streamId [{}]",
+                    groupId, streamId);
+            return false;
+        }
+
+        log.info("add delete stream listener for groupId [{}] streamId [{}]", groupId, streamId);
+        return true;
+    }
+
     @Override
     public ListenerResult listen(WorkflowContext context) throws Exception {
         ProcessForm processForm = context.getProcessForm();
@@ -69,7 +91,7 @@ public class DeleteStreamListener implements SortOperateListener {
         final String streamId = streamInfo.getInlongStreamId();
         Map<String, String> kvConf = groupExtList.stream().collect(
                 Collectors.toMap(InlongGroupExtInfo::getKeyName, InlongGroupExtInfo::getKeyValue));
-        streamExtList.stream().forEach(extInfo -> {
+        streamExtList.forEach(extInfo -> {
             kvConf.put(extInfo.getKeyName(), extInfo.getKeyValue());
         });
         String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
index 6ef6f2a6c..1ba7fdd69 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.enums.GroupOperateType;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
@@ -54,6 +55,25 @@ public class RestartSortListener implements SortOperateListener {
         return TaskEvent.COMPLETE;
     }
 
+    @Override
+    public boolean accept(WorkflowContext workflowContext) {
+        ProcessForm processForm = workflowContext.getProcessForm();
+        String groupId = processForm.getInlongGroupId();
+        if (!(processForm instanceof GroupResourceProcessForm)) {
+            log.info("not add restart group listener, not GroupResourceProcessForm for groupId [{}]", groupId);
+            return false;
+        }
+
+        GroupResourceProcessForm groupProcessForm = (GroupResourceProcessForm) processForm;
+        if (groupProcessForm.getGroupOperateType() != GroupOperateType.RESTART) {
+            log.info("not add restart group listener, as the operate was not RESTART for groupId [{}]", groupId);
+            return false;
+        }
+
+        log.info("add restart group listener for groupId [{}]", groupId);
+        return true;
+    }
+
     @Override
     public ListenerResult listen(WorkflowContext context) throws Exception {
         ProcessForm processForm = context.getProcessForm();
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java
index 157153e32..cfc76da5e 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.enums.GroupOperateType;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamExtInfo;
@@ -56,6 +57,27 @@ public class RestartStreamListener implements SortOperateListener {
         return TaskEvent.COMPLETE;
     }
 
+    @Override
+    public boolean accept(WorkflowContext workflowContext) {
+        ProcessForm processForm = workflowContext.getProcessForm();
+        String groupId = processForm.getInlongGroupId();
+        if (!(processForm instanceof StreamResourceProcessForm)) {
+            log.info("not add restart stream listener, not StreamResourceProcessForm for groupId [{}]", groupId);
+            return false;
+        }
+
+        StreamResourceProcessForm streamProcessForm = (StreamResourceProcessForm) processForm;
+        String streamId = streamProcessForm.getStreamInfo().getInlongStreamId();
+        if (streamProcessForm.getGroupOperateType() != GroupOperateType.RESTART) {
+            log.info("not add restart stream listener, as the operate was not RESTART for groupId [{}] streamId [{}]",
+                    groupId, streamId);
+            return false;
+        }
+
+        log.info("add restart stream listener for groupId [{}] streamId [{}]", groupId, streamId);
+        return true;
+    }
+
     @Override
     public ListenerResult listen(WorkflowContext context) throws Exception {
         ProcessForm processForm = context.getProcessForm();
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
index 5c80896a7..5fdbde1a2 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.enums.GroupOperateType;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
@@ -55,6 +56,24 @@ public class StartupSortListener implements SortOperateListener {
         return TaskEvent.COMPLETE;
     }
 
+    @Override
+    public boolean accept(WorkflowContext workflowContext) {
+        ProcessForm processForm = workflowContext.getProcessForm();
+        String groupId = processForm.getInlongGroupId();
+        if (!(processForm instanceof GroupResourceProcessForm)) {
+            log.info("not add startup group listener, not GroupResourceProcessForm for groupId [{}]", groupId);
+            return false;
+        }
+        GroupResourceProcessForm groupProcessForm = (GroupResourceProcessForm) processForm;
+        if (groupProcessForm.getGroupOperateType() != GroupOperateType.INIT) {
+            log.info("not add startup group listener, as the operate was not INIT for groupId [{}]", groupId);
+            return false;
+        }
+
+        log.info("add startup group listener for groupId [{}]", groupId);
+        return true;
+    }
+
     @Override
     public ListenerResult listen(WorkflowContext context) throws Exception {
         ProcessForm processForm = context.getProcessForm();
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupStreamListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupStreamListener.java
index adbbf11ef..40856aa82 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupStreamListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupStreamListener.java
@@ -23,6 +23,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.enums.GroupOperateType;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamExtInfo;
@@ -57,6 +58,27 @@ public class StartupStreamListener implements SortOperateListener {
         return TaskEvent.COMPLETE;
     }
 
+    @Override
+    public boolean accept(WorkflowContext workflowContext) {
+        ProcessForm processForm = workflowContext.getProcessForm();
+        String groupId = processForm.getInlongGroupId();
+        if (!(processForm instanceof StreamResourceProcessForm)) {
+            log.info("not add startup stream listener, not StreamResourceProcessForm for groupId [{}]", groupId);
+            return false;
+        }
+
+        StreamResourceProcessForm streamProcessForm = (StreamResourceProcessForm) processForm;
+        String streamId = streamProcessForm.getStreamInfo().getInlongStreamId();
+        if (streamProcessForm.getGroupOperateType() != GroupOperateType.INIT) {
+            log.info("not add startup stream listener, as the operate was not INIT for groupId [{}] streamId [{}]",
+                    groupId, streamId);
+            return false;
+        }
+
+        log.info("add startup stream listener for groupId [{}] streamId [{}]", groupId, streamId);
+        return true;
+    }
+
     @Override
     public ListenerResult listen(WorkflowContext context) throws Exception {
         ProcessForm processForm = context.getProcessForm();
@@ -77,7 +99,7 @@ public class StartupStreamListener implements SortOperateListener {
 
         Map<String, String> kvConf = groupExtList.stream().collect(
                 Collectors.toMap(InlongGroupExtInfo::getKeyName, InlongGroupExtInfo::getKeyValue));
-        streamExtList.stream().forEach(extInfo -> {
+        streamExtList.forEach(extInfo -> {
             kvConf.put(extInfo.getKeyName(), extInfo.getKeyValue());
         });
         String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
index ddf1a5cc7..c55973694 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
@@ -21,11 +21,12 @@ import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.enums.GroupOperateType;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
 import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
-import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.plugin.flink.FlinkOperation;
 import org.apache.inlong.manager.plugin.flink.FlinkService;
 import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
@@ -53,6 +54,25 @@ public class SuspendSortListener implements SortOperateListener {
         return TaskEvent.COMPLETE;
     }
 
+    @Override
+    public boolean accept(WorkflowContext workflowContext) {
+        ProcessForm processForm = workflowContext.getProcessForm();
+        String groupId = processForm.getInlongGroupId();
+        if (!(processForm instanceof GroupResourceProcessForm)) {
+            log.info("not add suspend group listener, not GroupResourceProcessForm for groupId [{}]", groupId);
+            return false;
+        }
+
+        GroupResourceProcessForm groupProcessForm = (GroupResourceProcessForm) processForm;
+        if (groupProcessForm.getGroupOperateType() != GroupOperateType.SUSPEND) {
+            log.info("not add suspend group listener, as the operate was not SUSPEND for groupId [{}]", groupId);
+            return false;
+        }
+
+        log.info("add suspend group listener for groupId [{}]", groupId);
+        return true;
+    }
+
     @Override
     public ListenerResult listen(WorkflowContext context) throws Exception {
         ProcessForm processForm = context.getProcessForm();
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendStreamListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendStreamListener.java
index a0867c168..19a226b6b 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendStreamListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendStreamListener.java
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.enums.GroupOperateType;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamExtInfo;
@@ -55,6 +56,27 @@ public class SuspendStreamListener implements SortOperateListener {
         return TaskEvent.COMPLETE;
     }
 
+    @Override
+    public boolean accept(WorkflowContext workflowContext) {
+        ProcessForm processForm = workflowContext.getProcessForm();
+        String groupId = processForm.getInlongGroupId();
+        if (!(processForm instanceof StreamResourceProcessForm)) {
+            log.info("not add suspend stream listener, not StreamResourceProcessForm for groupId [{}]", groupId);
+            return false;
+        }
+
+        StreamResourceProcessForm streamProcessForm = (StreamResourceProcessForm) processForm;
+        String streamId = streamProcessForm.getStreamInfo().getInlongStreamId();
+        if (streamProcessForm.getGroupOperateType() != GroupOperateType.SUSPEND) {
+            log.info("not add suspend stream listener as the operate SUSPEND for groupId [{}] streamId [{}]",
+                    groupId, streamId);
+            return false;
+        }
+
+        log.info("add suspend stream listener for groupId [{}] streamId [{}]", groupId, streamId);
+        return true;
+    }
+
     @Override
     public ListenerResult listen(WorkflowContext context) throws Exception {
         ProcessForm processForm = context.getProcessForm();
diff --git a/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/StartupSortListenerTest.java b/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/StartupSortListenerTest.java
index c47f5f71c..e79171afc 100644
--- a/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/StartupSortListenerTest.java
+++ b/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/StartupSortListenerTest.java
@@ -37,14 +37,12 @@ public class StartupSortListenerTest {
 
     @Test
     public void testListener() throws Exception {
-
         WorkflowContext context = new WorkflowContext();
-        GroupResourceProcessForm groupResourceProcessForm = new GroupResourceProcessForm();
-
-        context.setProcessForm(groupResourceProcessForm);
+        GroupResourceProcessForm groupResourceForm = new GroupResourceProcessForm();
+        context.setProcessForm(groupResourceForm);
         InlongPulsarInfo pulsarInfo = new InlongPulsarInfo();
         pulsarInfo.setInlongGroupId("1");
-        groupResourceProcessForm.setGroupInfo(pulsarInfo);
+        groupResourceForm.setGroupInfo(pulsarInfo);
 
         List<InlongGroupExtInfo> inlongGroupExtInfos = new ArrayList<>();
         InlongGroupExtInfo inlongGroupExtInfo1 = new InlongGroupExtInfo();
diff --git a/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/SuspendSortListenerTest.java b/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/SuspendSortListenerTest.java
index 087481ece..7d4f18cd7 100644
--- a/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/SuspendSortListenerTest.java
+++ b/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/SuspendSortListenerTest.java
@@ -38,11 +38,11 @@ public class SuspendSortListenerTest {
     @Test
     public void testListener() throws Exception {
         WorkflowContext context = new WorkflowContext();
-        GroupResourceProcessForm groupResourceProcessForm = new GroupResourceProcessForm();
-        context.setProcessForm(groupResourceProcessForm);
+        GroupResourceProcessForm groupResourceForm = new GroupResourceProcessForm();
+        context.setProcessForm(groupResourceForm);
         InlongPulsarInfo pulsarInfo = new InlongPulsarInfo();
         pulsarInfo.setInlongGroupId("1");
-        groupResourceProcessForm.setGroupInfo(pulsarInfo);
+        groupResourceForm.setGroupInfo(pulsarInfo);
 
         InlongGroupExtInfo inlongGroupExtInfo1 = new InlongGroupExtInfo();
         inlongGroupExtInfo1.setKeyName(InlongConstants.SORT_URL);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarGroupTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarGroupTaskListener.java
index 6adc8fcea..e04db57ea 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarGroupTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarGroupTaskListener.java
@@ -19,15 +19,19 @@ package org.apache.inlong.manager.service.mq;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.ClusterType;
+import org.apache.inlong.manager.common.enums.GroupOperateType;
+import org.apache.inlong.manager.common.enums.MQType;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
 import org.apache.inlong.manager.common.pojo.cluster.ClusterInfo;
 import org.apache.inlong.manager.common.pojo.cluster.pulsar.PulsarClusterInfo;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.common.pojo.group.pulsar.InlongPulsarInfo;
 import org.apache.inlong.manager.common.pojo.pulsar.PulsarTopicBean;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamBriefInfo;
 import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
-import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
 import org.apache.inlong.manager.service.cluster.InlongClusterService;
 import org.apache.inlong.manager.service.core.ConsumptionService;
 import org.apache.inlong.manager.service.core.InlongStreamService;
@@ -67,6 +71,36 @@ public class CreatePulsarGroupTaskListener implements QueueOperateListener {
         return TaskEvent.COMPLETE;
     }
 
+    @Override
+    public boolean accept(WorkflowContext context) {
+        ProcessForm processForm = context.getProcessForm();
+        if (!(processForm instanceof GroupResourceProcessForm)) {
+            return false;
+        }
+        GroupResourceProcessForm form = (GroupResourceProcessForm) processForm;
+        GroupOperateType operateType = form.getGroupOperateType();
+        if (operateType != GroupOperateType.INIT) {
+            return false;
+        }
+        String groupId = form.getInlongGroupId();
+        InlongGroupInfo groupInfo = form.getGroupInfo();
+        MQType mqType = MQType.forType(groupInfo.getMqType());
+        if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
+            InlongPulsarInfo pulsarInfo = (InlongPulsarInfo) groupInfo;
+            boolean enable = InlongConstants.ENABLE_CREATE_RESOURCE.equals(pulsarInfo.getEnableCreateResource());
+            if (enable) {
+                log.info("need to create pulsar resource as the createResource was true for groupId [{}]", groupId);
+                return true;
+            } else {
+                log.info("skip to create pulsar resource as the createResource was false for groupId [{}]", groupId);
+                return false;
+            }
+        }
+
+        log.warn("skip to create pulsar subscription as the mq type is {} for groupId [{}]", mqType, groupId);
+        return false;
+    }
+
     @Override
     public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
         GroupResourceProcessForm form = (GroupResourceProcessForm) context.getProcessForm();
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarResourceTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarResourceTaskListener.java
index ad05d1e87..41a97b2c2 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarResourceTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarResourceTaskListener.java
@@ -19,7 +19,10 @@ package org.apache.inlong.manager.service.mq;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.ClusterType;
+import org.apache.inlong.manager.common.enums.GroupOperateType;
+import org.apache.inlong.manager.common.enums.MQType;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
 import org.apache.inlong.manager.common.pojo.cluster.ClusterInfo;
 import org.apache.inlong.manager.common.pojo.cluster.pulsar.PulsarClusterInfo;
@@ -28,7 +31,7 @@ import org.apache.inlong.manager.common.pojo.group.pulsar.InlongPulsarInfo;
 import org.apache.inlong.manager.common.pojo.pulsar.PulsarTopicBean;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamBriefInfo;
 import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
-import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.service.cluster.InlongClusterService;
 import org.apache.inlong.manager.service.core.InlongStreamService;
@@ -66,6 +69,36 @@ public class CreatePulsarResourceTaskListener implements QueueOperateListener {
         return TaskEvent.COMPLETE;
     }
 
+    @Override
+    public boolean accept(WorkflowContext context) {
+        ProcessForm processForm = context.getProcessForm();
+        if (!(processForm instanceof GroupResourceProcessForm)) {
+            return false;
+        }
+        GroupResourceProcessForm form = (GroupResourceProcessForm) processForm;
+        GroupOperateType operateType = form.getGroupOperateType();
+        if (operateType != GroupOperateType.INIT) {
+            return false;
+        }
+        String groupId = form.getInlongGroupId();
+        InlongGroupInfo groupInfo = form.getGroupInfo();
+        MQType mqType = MQType.forType(groupInfo.getMqType());
+        if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
+            InlongPulsarInfo pulsarInfo = (InlongPulsarInfo) groupInfo;
+            boolean enable = InlongConstants.ENABLE_CREATE_RESOURCE.equals(pulsarInfo.getEnableCreateResource());
+            if (enable) {
+                log.info("need to create pulsar resource as the createResource was true for groupId [{}]", groupId);
+                return true;
+            } else {
+                log.info("skip to create pulsar resource as the createResource was false for groupId [{}]", groupId);
+                return false;
+            }
+        }
+
+        log.warn("skip to create pulsar subscription as the mq type is {} for groupId [{}]", mqType, groupId);
+        return false;
+    }
+
     @Override
     public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
         GroupResourceProcessForm form = (GroupResourceProcessForm) context.getProcessForm();
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarSubscriptionTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarSubscriptionTaskListener.java
index 262076451..7cbe50fd6 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarSubscriptionTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarSubscriptionTaskListener.java
@@ -18,14 +18,19 @@
 package org.apache.inlong.manager.service.mq;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.ClusterType;
+import org.apache.inlong.manager.common.enums.GroupOperateType;
+import org.apache.inlong.manager.common.enums.MQType;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
 import org.apache.inlong.manager.common.pojo.cluster.ClusterInfo;
 import org.apache.inlong.manager.common.pojo.cluster.pulsar.PulsarClusterInfo;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.common.pojo.group.pulsar.InlongPulsarInfo;
 import org.apache.inlong.manager.common.pojo.pulsar.PulsarTopicBean;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
 import org.apache.inlong.manager.common.pojo.workflow.form.process.StreamResourceProcessForm;
 import org.apache.inlong.manager.service.cluster.InlongClusterService;
 import org.apache.inlong.manager.service.core.ConsumptionService;
@@ -63,6 +68,38 @@ public class CreatePulsarSubscriptionTaskListener implements QueueOperateListene
         return TaskEvent.COMPLETE;
     }
 
+    @Override
+    public boolean accept(WorkflowContext context) {
+        ProcessForm processForm = context.getProcessForm();
+        if (!(processForm instanceof StreamResourceProcessForm)) {
+            return false;
+        }
+        StreamResourceProcessForm streamResourceProcessForm = (StreamResourceProcessForm) processForm;
+        GroupOperateType operateType = streamResourceProcessForm.getGroupOperateType();
+        if (operateType != GroupOperateType.INIT) {
+            return false;
+        }
+
+        InlongGroupInfo groupInfo = streamResourceProcessForm.getGroupInfo();
+        MQType mqType = MQType.forType(groupInfo.getMqType());
+        String groupId = groupInfo.getInlongGroupId();
+        String streamId = streamResourceProcessForm.getStreamInfo().getInlongStreamId();
+        if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
+            InlongPulsarInfo pulsarInfo = (InlongPulsarInfo) groupInfo;
+            boolean enable = InlongConstants.ENABLE_CREATE_RESOURCE.equals(pulsarInfo.getEnableCreateResource());
+            if (enable) {
+                log.info("need to create pulsar topic as the createResource was true for groupId [{}] streamId [{}]",
+                        groupId, streamId);
+                return true;
+            } else {
+                log.info("skip to create pulsar topic as the createResource was false for groupId [{}] streamId [{}]",
+                        groupId, streamId);
+                return false;
+            }
+        }
+        return false;
+    }
+
     @Override
     public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
         StreamResourceProcessForm form = (StreamResourceProcessForm) context.getProcessForm();
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarTopicTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarTopicTaskListener.java
index 1d18994ee..6ecafd029 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarTopicTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarTopicTaskListener.java
@@ -21,6 +21,8 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.ClusterType;
+import org.apache.inlong.manager.common.enums.GroupOperateType;
+import org.apache.inlong.manager.common.enums.MQType;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
 import org.apache.inlong.manager.common.pojo.cluster.ClusterInfo;
 import org.apache.inlong.manager.common.pojo.cluster.pulsar.PulsarClusterInfo;
@@ -28,6 +30,7 @@ import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.group.pulsar.InlongPulsarInfo;
 import org.apache.inlong.manager.common.pojo.pulsar.PulsarTopicBean;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
 import org.apache.inlong.manager.common.pojo.workflow.form.process.StreamResourceProcessForm;
 import org.apache.inlong.manager.service.cluster.InlongClusterService;
 import org.apache.inlong.manager.service.mq.util.PulsarOperator;
@@ -57,6 +60,38 @@ public class CreatePulsarTopicTaskListener implements QueueOperateListener {
         return TaskEvent.COMPLETE;
     }
 
+    @Override
+    public boolean accept(WorkflowContext context) {
+        ProcessForm processForm = context.getProcessForm();
+        if (!(processForm instanceof StreamResourceProcessForm)) {
+            return false;
+        }
+        StreamResourceProcessForm streamResourceProcessForm = (StreamResourceProcessForm) processForm;
+        GroupOperateType operateType = streamResourceProcessForm.getGroupOperateType();
+        if (operateType != GroupOperateType.INIT) {
+            return false;
+        }
+
+        InlongGroupInfo groupInfo = streamResourceProcessForm.getGroupInfo();
+        MQType mqType = MQType.forType(groupInfo.getMqType());
+        String groupId = groupInfo.getInlongGroupId();
+        String streamId = streamResourceProcessForm.getStreamInfo().getInlongStreamId();
+        if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
+            InlongPulsarInfo pulsarInfo = (InlongPulsarInfo) groupInfo;
+            boolean enable = InlongConstants.ENABLE_CREATE_RESOURCE.equals(pulsarInfo.getEnableCreateResource());
+            if (enable) {
+                log.info("need to create pulsar topic as the createResource was true for groupId [{}] streamId [{}]",
+                        groupId, streamId);
+                return true;
+            } else {
+                log.info("skip to create pulsar topic as the createResource was false for groupId [{}] streamId [{}]",
+                        groupId, streamId);
+                return false;
+            }
+        }
+        return false;
+    }
+
     @Override
     public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
         StreamResourceProcessForm form = (StreamResourceProcessForm) context.getProcessForm();
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreateTubeGroupTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreateTubeGroupTaskListener.java
index 437005eae..187fdb845 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreateTubeGroupTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreateTubeGroupTaskListener.java
@@ -19,9 +19,11 @@ package org.apache.inlong.manager.service.mq;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.common.enums.ClusterType;
+import org.apache.inlong.manager.common.enums.MQType;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
 import org.apache.inlong.manager.common.pojo.cluster.tube.TubeClusterInfo;
 import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
 import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
 import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
 import org.apache.inlong.manager.service.cluster.InlongClusterService;
@@ -52,6 +54,24 @@ public class CreateTubeGroupTaskListener implements QueueOperateListener {
         return TaskEvent.COMPLETE;
     }
 
+    @Override
+    public boolean accept(WorkflowContext context) {
+        ProcessForm processForm = context.getProcessForm();
+        if (!(processForm instanceof GroupResourceProcessForm)) {
+            return false;
+        }
+        GroupResourceProcessForm form = (GroupResourceProcessForm) processForm;
+        String groupId = form.getInlongGroupId();
+        MQType mqType = MQType.forType(form.getGroupInfo().getMqType());
+        if (mqType == MQType.TUBE) {
+            log.info("need to create tube resource for groupId [{}]", groupId);
+            return true;
+        }
+
+        log.warn("skip to to create tube resource as the mq type is {} for groupId [{}]", mqType, groupId);
+        return false;
+    }
+
     @Override
     public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
         GroupResourceProcessForm form = (GroupResourceProcessForm) context.getProcessForm();
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreateTubeTopicTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreateTubeTopicTaskListener.java
index 4c422c791..f4d6dd204 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreateTubeTopicTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreateTubeTopicTaskListener.java
@@ -19,11 +19,13 @@ package org.apache.inlong.manager.service.mq;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.common.enums.ClusterType;
+import org.apache.inlong.manager.common.enums.MQType;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
 import org.apache.inlong.manager.common.pojo.cluster.ClusterInfo;
 import org.apache.inlong.manager.common.pojo.cluster.tube.TubeClusterInfo;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
 import org.apache.inlong.manager.service.cluster.InlongClusterService;
 import org.apache.inlong.manager.service.mq.util.TubeMQOperator;
 import org.apache.inlong.manager.workflow.WorkflowContext;
@@ -50,6 +52,24 @@ public class CreateTubeTopicTaskListener implements QueueOperateListener {
         return TaskEvent.COMPLETE;
     }
 
+    @Override
+    public boolean accept(WorkflowContext context) {
+        ProcessForm processForm = context.getProcessForm();
+        if (!(processForm instanceof GroupResourceProcessForm)) {
+            return false;
+        }
+        GroupResourceProcessForm form = (GroupResourceProcessForm) processForm;
+        String groupId = form.getInlongGroupId();
+        MQType mqType = MQType.forType(form.getGroupInfo().getMqType());
+        if (mqType == MQType.TUBE) {
+            log.info("need to create tube resource for groupId [{}]", groupId);
+            return true;
+        }
+
+        log.warn("skip to to create tube resource as the mq type is {} for groupId [{}]", mqType, groupId);
+        return false;
+    }
+
     @Override
     public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
         GroupResourceProcessForm form = (GroupResourceProcessForm) context.getProcessForm();
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/DeletePulsarResourceTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/DeletePulsarResourceTaskListener.java
index f63afe0e9..978d75fbd 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/DeletePulsarResourceTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/DeletePulsarResourceTaskListener.java
@@ -18,7 +18,10 @@
 package org.apache.inlong.manager.service.mq;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.ClusterType;
+import org.apache.inlong.manager.common.enums.GroupOperateType;
+import org.apache.inlong.manager.common.enums.MQType;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
 import org.apache.inlong.manager.common.pojo.cluster.ClusterInfo;
 import org.apache.inlong.manager.common.pojo.cluster.pulsar.PulsarClusterInfo;
@@ -27,6 +30,7 @@ import org.apache.inlong.manager.common.pojo.group.pulsar.InlongPulsarInfo;
 import org.apache.inlong.manager.common.pojo.pulsar.PulsarTopicBean;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamBriefInfo;
 import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.service.cluster.InlongClusterService;
 import org.apache.inlong.manager.service.core.InlongStreamService;
@@ -64,6 +68,36 @@ public class DeletePulsarResourceTaskListener implements QueueOperateListener {
         return TaskEvent.COMPLETE;
     }
 
+    @Override
+    public boolean accept(WorkflowContext context) {
+        ProcessForm processForm = context.getProcessForm();
+        if (!(processForm instanceof GroupResourceProcessForm)) {
+            return false;
+        }
+        GroupResourceProcessForm form = (GroupResourceProcessForm) processForm;
+        GroupOperateType operateType = form.getGroupOperateType();
+        if (operateType != GroupOperateType.DELETE) {
+            return false;
+        }
+        String groupId = form.getInlongGroupId();
+        InlongGroupInfo groupInfo = form.getGroupInfo();
+        MQType mqType = MQType.forType(groupInfo.getMqType());
+        if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
+            InlongPulsarInfo pulsarInfo = (InlongPulsarInfo) groupInfo;
+            boolean enable = InlongConstants.ENABLE_CREATE_RESOURCE.equals(pulsarInfo.getEnableCreateResource());
+            if (enable) {
+                log.info("need to delete pulsar resource as the createResource was true for groupId [{}]", groupId);
+                return true;
+            } else {
+                log.info("skip to delete pulsar resource as the createResource was false for groupId [{}]", groupId);
+                return false;
+            }
+        }
+
+        log.warn("skip to delete pulsar subscription as the mq type is {} for groupId [{}]", mqType, groupId);
+        return false;
+    }
+
     @Override
     public ListenerResult listen(WorkflowContext context) throws Exception {
         GroupResourceProcessForm form = (GroupResourceProcessForm) context.getProcessForm();
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/DeletePulsarTopicTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/DeletePulsarTopicTaskListener.java
index 9530d679b..4b77a86af 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/DeletePulsarTopicTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/DeletePulsarTopicTaskListener.java
@@ -21,6 +21,8 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.ClusterType;
+import org.apache.inlong.manager.common.enums.GroupOperateType;
+import org.apache.inlong.manager.common.enums.MQType;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
 import org.apache.inlong.manager.common.pojo.cluster.ClusterInfo;
 import org.apache.inlong.manager.common.pojo.cluster.pulsar.PulsarClusterInfo;
@@ -28,6 +30,7 @@ import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.group.pulsar.InlongPulsarInfo;
 import org.apache.inlong.manager.common.pojo.pulsar.PulsarTopicBean;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
 import org.apache.inlong.manager.common.pojo.workflow.form.process.StreamResourceProcessForm;
 import org.apache.inlong.manager.service.cluster.InlongClusterService;
 import org.apache.inlong.manager.service.mq.util.PulsarOperator;
@@ -57,6 +60,38 @@ public class DeletePulsarTopicTaskListener implements QueueOperateListener {
         return TaskEvent.COMPLETE;
     }
 
+    @Override
+    public boolean accept(WorkflowContext context) {
+        ProcessForm processForm = context.getProcessForm();
+        if (!(processForm instanceof StreamResourceProcessForm)) {
+            return false;
+        }
+        StreamResourceProcessForm streamResourceProcessForm = (StreamResourceProcessForm) processForm;
+        GroupOperateType operateType = streamResourceProcessForm.getGroupOperateType();
+        if (operateType != GroupOperateType.DELETE) {
+            return false;
+        }
+
+        InlongGroupInfo groupInfo = streamResourceProcessForm.getGroupInfo();
+        MQType mqType = MQType.forType(groupInfo.getMqType());
+        String groupId = groupInfo.getInlongGroupId();
+        String streamId = streamResourceProcessForm.getStreamInfo().getInlongStreamId();
+        if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
+            InlongPulsarInfo pulsarInfo = (InlongPulsarInfo) groupInfo;
+            boolean enable = InlongConstants.ENABLE_CREATE_RESOURCE.equals(pulsarInfo.getEnableCreateResource());
+            if (enable) {
+                log.info("need to delete pulsar topic as the createResource was true for groupId [{}] streamId [{}]",
+                        groupId, streamId);
+                return true;
+            } else {
+                log.info("skip to delete pulsar topic as the createResource was false for groupId [{}] streamId [{}]",
+                        groupId, streamId);
+                return false;
+            }
+        }
+        return false;
+    }
+
     @Override
     public ListenerResult listen(WorkflowContext context) throws Exception {
         StreamResourceProcessForm form = (StreamResourceProcessForm) context.getProcessForm();
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/PulsarResourceCreateSelector.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/PulsarResourceCreateSelector.java
deleted file mode 100644
index 8c2015fbe..000000000
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/PulsarResourceCreateSelector.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.mq;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.consts.InlongConstants;
-import org.apache.inlong.manager.common.enums.GroupOperateType;
-import org.apache.inlong.manager.common.enums.MQType;
-import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
-import org.apache.inlong.manager.common.pojo.group.pulsar.InlongPulsarInfo;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
-import org.apache.inlong.manager.workflow.WorkflowContext;
-import org.apache.inlong.manager.workflow.event.EventSelector;
-
-/**
- * Selector of pulsar event for creating pulsar resource.
- */
-@Slf4j
-public class PulsarResourceCreateSelector implements EventSelector {
-
-    @Override
-    public boolean accept(WorkflowContext context) {
-        ProcessForm processForm = context.getProcessForm();
-        if (!(processForm instanceof GroupResourceProcessForm)) {
-            return false;
-        }
-        GroupResourceProcessForm form = (GroupResourceProcessForm) processForm;
-        GroupOperateType operateType = form.getGroupOperateType();
-        if (operateType != GroupOperateType.INIT) {
-            return false;
-        }
-        String groupId = form.getInlongGroupId();
-        InlongGroupInfo groupInfo = form.getGroupInfo();
-        MQType mqType = MQType.forType(groupInfo.getMqType());
-        if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
-            InlongPulsarInfo pulsarInfo = (InlongPulsarInfo) groupInfo;
-            boolean enable = InlongConstants.ENABLE_CREATE_RESOURCE.equals(pulsarInfo.getEnableCreateResource());
-            if (enable) {
-                log.info("need to create pulsar resource as the createResource was true for groupId [{}]", groupId);
-                return true;
-            } else {
-                log.info("skip to create pulsar resource as the createResource was false for groupId [{}]", groupId);
-                return false;
-            }
-        }
-
-        log.warn("skip to create pulsar subscription as the mq type is {} for groupId [{}]", mqType, groupId);
-        return false;
-    }
-
-}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/PulsarResourceDeleteSelector.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/PulsarResourceDeleteSelector.java
deleted file mode 100644
index 8ceabd306..000000000
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/PulsarResourceDeleteSelector.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.mq;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.consts.InlongConstants;
-import org.apache.inlong.manager.common.enums.GroupOperateType;
-import org.apache.inlong.manager.common.enums.MQType;
-import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
-import org.apache.inlong.manager.common.pojo.group.pulsar.InlongPulsarInfo;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
-import org.apache.inlong.manager.workflow.WorkflowContext;
-import org.apache.inlong.manager.workflow.event.EventSelector;
-
-/**
- * Selector of pulsar event for deleting pulsar resource
- */
-@Slf4j
-public class PulsarResourceDeleteSelector implements EventSelector {
-
-    @Override
-    public boolean accept(WorkflowContext context) {
-        ProcessForm processForm = context.getProcessForm();
-        if (!(processForm instanceof GroupResourceProcessForm)) {
-            return false;
-        }
-        GroupResourceProcessForm form = (GroupResourceProcessForm) processForm;
-        GroupOperateType operateType = form.getGroupOperateType();
-        if (operateType != GroupOperateType.DELETE) {
-            return false;
-        }
-        String groupId = form.getInlongGroupId();
-        InlongGroupInfo groupInfo = form.getGroupInfo();
-        MQType mqType = MQType.forType(groupInfo.getMqType());
-        if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
-            InlongPulsarInfo pulsarInfo = (InlongPulsarInfo) groupInfo;
-            boolean enable = InlongConstants.ENABLE_CREATE_RESOURCE.equals(pulsarInfo.getEnableCreateResource());
-            if (enable) {
-                log.info("need to delete pulsar resource as the createResource was true for groupId [{}]", groupId);
-                return true;
-            } else {
-                log.info("skip to delete pulsar resource as the createResource was false for groupId [{}]", groupId);
-                return false;
-            }
-        }
-
-        log.warn("skip to delete pulsar subscription as the mq type is {} for groupId [{}]", mqType, groupId);
-        return false;
-    }
-}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/PulsarTopicCreateSelector.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/PulsarTopicCreateSelector.java
deleted file mode 100644
index e27a721a6..000000000
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/PulsarTopicCreateSelector.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.mq;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.consts.InlongConstants;
-import org.apache.inlong.manager.common.enums.GroupOperateType;
-import org.apache.inlong.manager.common.enums.MQType;
-import org.apache.inlong.manager.common.pojo.group.pulsar.InlongPulsarInfo;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.StreamResourceProcessForm;
-import org.apache.inlong.manager.workflow.WorkflowContext;
-import org.apache.inlong.manager.workflow.event.EventSelector;
-
-@Slf4j
-public class PulsarTopicCreateSelector implements EventSelector {
-
-    @Override
-    public boolean accept(WorkflowContext context) {
-        ProcessForm processForm = context.getProcessForm();
-        if (!(processForm instanceof StreamResourceProcessForm)) {
-            return false;
-        }
-        StreamResourceProcessForm streamResourceProcessForm = (StreamResourceProcessForm) processForm;
-        GroupOperateType operateType = streamResourceProcessForm.getGroupOperateType();
-        if (operateType != GroupOperateType.INIT) {
-            return false;
-        }
-        MQType mqType = MQType.forType(streamResourceProcessForm.getGroupInfo().getMqType());
-        String groupId = streamResourceProcessForm.getGroupInfo().getInlongGroupId();
-        String streamId = streamResourceProcessForm.getStreamInfo().getInlongStreamId();
-        if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
-            InlongPulsarInfo pulsarInfo = (InlongPulsarInfo) streamResourceProcessForm.getGroupInfo();
-            boolean enable = InlongConstants.ENABLE_CREATE_RESOURCE.equals(pulsarInfo.getEnableCreateResource());
-            if (enable) {
-                log.info("need to create pulsar topic as the createResource was true for groupId [{}] streamId [{}]",
-                        groupId, streamId);
-                return true;
-            } else {
-                log.info("skip to create pulsar topic as the createResource was false for groupId [{}] streamId [{}]",
-                        groupId, streamId);
-                return false;
-            }
-        }
-        return false;
-
-    }
-}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/PulsarTopicDeleteSelector.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/PulsarTopicDeleteSelector.java
deleted file mode 100644
index 43c1d22bb..000000000
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/PulsarTopicDeleteSelector.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.mq;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.consts.InlongConstants;
-import org.apache.inlong.manager.common.enums.GroupOperateType;
-import org.apache.inlong.manager.common.enums.MQType;
-import org.apache.inlong.manager.common.pojo.group.pulsar.InlongPulsarInfo;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.StreamResourceProcessForm;
-import org.apache.inlong.manager.workflow.WorkflowContext;
-import org.apache.inlong.manager.workflow.event.EventSelector;
-
-@Slf4j
-public class PulsarTopicDeleteSelector implements EventSelector {
-
-    @Override
-    public boolean accept(WorkflowContext context) {
-        ProcessForm processForm = context.getProcessForm();
-        if (!(processForm instanceof StreamResourceProcessForm)) {
-            return false;
-        }
-        StreamResourceProcessForm streamResourceProcessForm = (StreamResourceProcessForm) processForm;
-        GroupOperateType operateType = streamResourceProcessForm.getGroupOperateType();
-        if (operateType != GroupOperateType.DELETE) {
-            return false;
-        }
-        MQType mqType = MQType.forType(streamResourceProcessForm.getGroupInfo().getMqType());
-        String groupId = streamResourceProcessForm.getGroupInfo().getInlongGroupId();
-        String streamId = streamResourceProcessForm.getStreamInfo().getInlongStreamId();
-        if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
-            InlongPulsarInfo pulsarInfo = (InlongPulsarInfo) streamResourceProcessForm.getGroupInfo();
-            boolean enable = InlongConstants.ENABLE_CREATE_RESOURCE.equals(pulsarInfo.getEnableCreateResource());
-            if (enable) {
-                log.info("need to delete pulsar topic as the createResource was true for groupId [{}] streamId [{}]",
-                        groupId, streamId);
-                return true;
-            } else {
-                log.info("skip to delete pulsar topic as the createResource was false for groupId [{}] streamId [{}]",
-                        groupId, streamId);
-                return false;
-            }
-        }
-        return false;
-    }
-}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/TubeEventSelector.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/TubeEventSelector.java
deleted file mode 100644
index 410a344fd..000000000
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/TubeEventSelector.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.mq;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.enums.MQType;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
-import org.apache.inlong.manager.workflow.WorkflowContext;
-import org.apache.inlong.manager.workflow.event.EventSelector;
-
-/**
- * Selector of tube event for create tube source.
- */
-@Slf4j
-public class TubeEventSelector implements EventSelector {
-
-    @Override
-    public boolean accept(WorkflowContext context) {
-        ProcessForm processForm = context.getProcessForm();
-        if (!(processForm instanceof GroupResourceProcessForm)) {
-            return false;
-        }
-        GroupResourceProcessForm form = (GroupResourceProcessForm) processForm;
-        String groupId = form.getInlongGroupId();
-        MQType mqType = MQType.forType(form.getGroupInfo().getMqType());
-        if (mqType == MQType.TUBE) {
-            log.info("need to create tube resource for groupId [{}]", groupId);
-            return true;
-        }
-
-        log.warn("skip to to create tube resource as the mq type is {} for groupId [{}]", mqType, groupId);
-        return false;
-    }
-}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/StreamSinkResourceListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/StreamSinkResourceListener.java
index 242c3bf2a..7d9ebe220 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/StreamSinkResourceListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/StreamSinkResourceListener.java
@@ -24,6 +24,7 @@ import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.SinkType;
 import org.apache.inlong.manager.common.pojo.sink.SinkInfo;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
 import org.apache.inlong.manager.common.pojo.workflow.form.process.StreamResourceProcessForm;
 import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
 import org.apache.inlong.manager.workflow.WorkflowContext;
@@ -54,6 +55,12 @@ public class StreamSinkResourceListener implements SinkOperateListener {
         return TaskEvent.COMPLETE;
     }
 
+    @Override
+    public boolean accept(WorkflowContext context) {
+        ProcessForm processForm = context.getProcessForm();
+        return processForm instanceof StreamResourceProcessForm;
+    }
+
     @Override
     public ListenerResult listen(WorkflowContext context) {
         StreamResourceProcessForm form = (StreamResourceProcessForm) context.getProcessForm();
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/SortConfigListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/SortConfigListener.java
index 14d8f3258..b97d13e72 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/SortConfigListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/SortConfigListener.java
@@ -17,11 +17,15 @@
 
 package org.apache.inlong.manager.service.sort;
 
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.GroupOperateType;
+import org.apache.inlong.manager.common.enums.MQType;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.StreamResourceProcessForm;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
 import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
@@ -50,6 +54,33 @@ public class SortConfigListener implements SortOperateListener {
         return TaskEvent.COMPLETE;
     }
 
+    @Override
+    public boolean accept(WorkflowContext context) {
+        ProcessForm processForm = context.getProcessForm();
+        String groupId = processForm.getInlongGroupId();
+        if (processForm instanceof GroupResourceProcessForm) {
+            GroupResourceProcessForm groupResourceForm = (GroupResourceProcessForm) processForm;
+            InlongGroupInfo groupInfo = groupResourceForm.getGroupInfo();
+            boolean enable = InlongConstants.DISABLE_ZK.equals(groupInfo.getEnableZookeeper())
+                    && MQType.forType(groupInfo.getMqType()) != MQType.NONE;
+
+            LOGGER.info("zookeeper disabled was [{}] for groupId [{}]", enable, groupId);
+            return enable;
+        } else if (processForm instanceof StreamResourceProcessForm) {
+            StreamResourceProcessForm streamResourceForm = (StreamResourceProcessForm) processForm;
+            InlongGroupInfo groupInfo = streamResourceForm.getGroupInfo();
+            InlongStreamInfo streamInfo = streamResourceForm.getStreamInfo();
+            boolean enable = InlongConstants.DISABLE_ZK.equals(groupInfo.getEnableZookeeper())
+                    && MQType.forType(groupInfo.getMqType()) != MQType.NONE;
+            LOGGER.info("zookeeper disabled was [{}] for groupId [{}] and streamId [{}] ", enable, groupId,
+                    streamInfo.getInlongStreamId());
+            return enable;
+        } else {
+            LOGGER.info("zk disabled for groupId [{}]", groupId);
+            return false;
+        }
+    }
+
     @Override
     public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
         GroupResourceProcessForm form = (GroupResourceProcessForm) context.getProcessForm();
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/StreamSortConfigListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/StreamSortConfigListener.java
index dda1086fc..6f4293a65 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/StreamSortConfigListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/StreamSortConfigListener.java
@@ -18,11 +18,15 @@
 package org.apache.inlong.manager.service.sort;
 
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.GroupOperateType;
+import org.apache.inlong.manager.common.enums.MQType;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.sink.StreamSink;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
 import org.apache.inlong.manager.common.pojo.workflow.form.process.StreamResourceProcessForm;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
@@ -53,6 +57,23 @@ public class StreamSortConfigListener implements SortOperateListener {
         return TaskEvent.COMPLETE;
     }
 
+    @Override
+    public boolean accept(WorkflowContext context) {
+        ProcessForm processForm = context.getProcessForm();
+        String groupId = processForm.getInlongGroupId();
+        if (!(processForm instanceof GroupResourceProcessForm)) {
+            LOGGER.warn("zookeeper enabled was [false] for groupId [{}]", groupId);
+            return false;
+        }
+
+        GroupResourceProcessForm groupResourceForm = (GroupResourceProcessForm) processForm;
+        InlongGroupInfo groupInfo = groupResourceForm.getGroupInfo();
+        boolean enable = InlongConstants.ENABLE_ZK.equals(groupInfo.getEnableZookeeper())
+                && MQType.forType(groupInfo.getMqType()) != MQType.NONE;
+        LOGGER.info("zookeeper enabled was [{}] for groupId [{}]", enable, groupId);
+        return enable;
+    }
+
     @Override
     public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
         StreamResourceProcessForm form = (StreamResourceProcessForm) context.getProcessForm();
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/ZookeeperDisabledSelector.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/ZookeeperDisabledSelector.java
deleted file mode 100644
index cac93aa5f..000000000
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/ZookeeperDisabledSelector.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.sort;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.consts.InlongConstants;
-import org.apache.inlong.manager.common.enums.MQType;
-import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.StreamResourceProcessForm;
-import org.apache.inlong.manager.workflow.WorkflowContext;
-import org.apache.inlong.manager.workflow.event.EventSelector;
-
-/**
- * Event selector for whether ZooKeeper is disabled.
- */
-@Slf4j
-public class ZookeeperDisabledSelector implements EventSelector {
-
-    @Override
-    public boolean accept(WorkflowContext context) {
-        ProcessForm processForm = context.getProcessForm();
-        String groupId = processForm.getInlongGroupId();
-        if (processForm instanceof GroupResourceProcessForm) {
-            GroupResourceProcessForm groupResourceForm = (GroupResourceProcessForm) processForm;
-            InlongGroupInfo groupInfo = groupResourceForm.getGroupInfo();
-            boolean enable = InlongConstants.DISABLE_ZK.equals(groupInfo.getEnableZookeeper())
-                    && MQType.forType(groupInfo.getMqType()) != MQType.NONE;
-
-            log.info("zookeeper disabled was [{}] for groupId [{}]", enable, groupId);
-            return enable;
-        } else if (processForm instanceof StreamResourceProcessForm) {
-            StreamResourceProcessForm streamResourceForm = (StreamResourceProcessForm) processForm;
-            InlongGroupInfo groupInfo = streamResourceForm.getGroupInfo();
-            InlongStreamInfo streamInfo = streamResourceForm.getStreamInfo();
-            boolean enable = InlongConstants.DISABLE_ZK.equals(groupInfo.getEnableZookeeper())
-                    && MQType.forType(groupInfo.getMqType()) != MQType.NONE;
-            log.info("zookeeper disabled was [{}] for groupId [{}] and streamId [{}] ", enable, groupId,
-                    streamInfo.getInlongStreamId());
-            return enable;
-        } else {
-            log.info("zk disabled for groupId [{}]", groupId);
-            return false;
-        }
-    }
-}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/ZookeeperEnabledSelector.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/ZookeeperEnabledSelector.java
deleted file mode 100644
index 2c3d21952..000000000
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/ZookeeperEnabledSelector.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.sort;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.consts.InlongConstants;
-import org.apache.inlong.manager.common.enums.MQType;
-import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
-import org.apache.inlong.manager.workflow.WorkflowContext;
-import org.apache.inlong.manager.workflow.event.EventSelector;
-
-/**
- * Event selector for whether ZooKeeper is enabled.
- */
-@Slf4j
-@Deprecated
-public class ZookeeperEnabledSelector implements EventSelector {
-
-    @Override
-    public boolean accept(WorkflowContext context) {
-        ProcessForm processForm = context.getProcessForm();
-        String groupId = processForm.getInlongGroupId();
-        if (!(processForm instanceof GroupResourceProcessForm)) {
-            log.warn("zookeeper enabled was [false] for groupId [{}]", groupId);
-            return false;
-        }
-
-        GroupResourceProcessForm groupResourceForm = (GroupResourceProcessForm) processForm;
-        InlongGroupInfo groupInfo = groupResourceForm.getGroupInfo();
-        boolean enable = InlongConstants.ENABLE_ZK.equals(groupInfo.getEnableZookeeper())
-                && MQType.forType(groupInfo.getMqType()) != MQType.NONE;
-        log.info("zookeeper enabled was [{}] for groupId [{}]", enable, groupId);
-        return enable;
-    }
-
-}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/AbstractSourceOperateListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/AbstractSourceOperateListener.java
index cd62aceec..9ec754ceb 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/AbstractSourceOperateListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/AbstractSourceOperateListener.java
@@ -40,7 +40,7 @@ import org.apache.inlong.manager.service.core.InlongStreamService;
 import org.apache.inlong.manager.service.source.StreamSourceService;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
-import org.apache.inlong.manager.workflow.event.task.DataSourceOperateListener;
+import org.apache.inlong.manager.workflow.event.task.SourceOperateListener;
 import org.apache.inlong.manager.workflow.event.task.TaskEvent;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
@@ -53,7 +53,7 @@ import java.util.concurrent.TimeUnit;
  */
 @Slf4j
 @Component
-public abstract class AbstractSourceOperateListener implements DataSourceOperateListener {
+public abstract class AbstractSourceOperateListener implements SourceOperateListener {
 
     @Autowired
     protected InlongStreamService streamService;
@@ -146,7 +146,7 @@ public abstract class AbstractSourceOperateListener implements DataSourceOperate
                 return CommonBeanUtils.copyProperties((KafkaSource) streamSource, KafkaSourceRequest::new);
             default:
                 throw new IllegalArgumentException(
-                        String.format("Unsupported type=%s for DataSourceOperateListener", type));
+                        String.format("Unsupported type=%s for SourceOperateListener", type));
         }
     }
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/SourceDeleteEventSelector.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/SourceDeleteEventSelector.java
deleted file mode 100644
index 087892ed5..000000000
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/SourceDeleteEventSelector.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.source.listener;
-
-import org.apache.inlong.manager.common.enums.GroupOperateType;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
-import org.apache.inlong.manager.workflow.WorkflowContext;
-import org.apache.inlong.manager.workflow.event.EventSelector;
-
-/**
- * Selector of source delete event.
- */
-public class SourceDeleteEventSelector implements EventSelector {
-
-    @Override
-    public boolean accept(WorkflowContext context) {
-        ProcessForm processForm = context.getProcessForm();
-        if (!(processForm instanceof GroupResourceProcessForm)) {
-            return false;
-        }
-        GroupResourceProcessForm groupResourceProcessForm = (GroupResourceProcessForm) processForm;
-        return groupResourceProcessForm.getGroupOperateType() == GroupOperateType.DELETE;
-    }
-}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/SourceDeleteListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/SourceDeleteListener.java
index 16faeebb1..0277cc9d4 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/SourceDeleteListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/SourceDeleteListener.java
@@ -18,7 +18,11 @@
 package org.apache.inlong.manager.service.source.listener;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.inlong.manager.common.enums.GroupOperateType;
 import org.apache.inlong.manager.common.pojo.source.SourceRequest;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
+import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.springframework.stereotype.Component;
 
 /**
@@ -33,6 +37,16 @@ public class SourceDeleteListener extends AbstractSourceOperateListener {
         return getClass().getSimpleName();
     }
 
+    @Override
+    public boolean accept(WorkflowContext context) {
+        ProcessForm processForm = context.getProcessForm();
+        if (!(processForm instanceof GroupResourceProcessForm)) {
+            return false;
+        }
+        GroupResourceProcessForm groupResourceForm = (GroupResourceProcessForm) processForm;
+        return groupResourceForm.getGroupOperateType() == GroupOperateType.DELETE;
+    }
+
     @Override
     public void operateStreamSource(SourceRequest sourceRequest, String operator) {
         streamSourceService.delete(sourceRequest.getId(), operator);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/SourceRestartEventSelector.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/SourceRestartEventSelector.java
deleted file mode 100644
index 0d3b2f57b..000000000
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/SourceRestartEventSelector.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.source.listener;
-
-import org.apache.inlong.manager.common.enums.GroupOperateType;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
-import org.apache.inlong.manager.workflow.WorkflowContext;
-import org.apache.inlong.manager.workflow.event.EventSelector;
-
-/**
- * Selector of source restart event.
- */
-public class SourceRestartEventSelector implements EventSelector {
-
-    @Override
-    public boolean accept(WorkflowContext context) {
-        ProcessForm processForm = context.getProcessForm();
-        if (!(processForm instanceof GroupResourceProcessForm)) {
-            return false;
-        }
-        GroupResourceProcessForm groupResourceProcessForm = (GroupResourceProcessForm) processForm;
-        return groupResourceProcessForm.getGroupOperateType() == GroupOperateType.RESTART;
-    }
-}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/SourceRestartListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/SourceRestartListener.java
index 7422c2312..ea5071dfd 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/SourceRestartListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/SourceRestartListener.java
@@ -17,7 +17,11 @@
 
 package org.apache.inlong.manager.service.source.listener;
 
+import org.apache.inlong.manager.common.enums.GroupOperateType;
 import org.apache.inlong.manager.common.pojo.source.SourceRequest;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
+import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.springframework.stereotype.Component;
 
 /**
@@ -31,6 +35,16 @@ public class SourceRestartListener extends AbstractSourceOperateListener {
         return getClass().getSimpleName();
     }
 
+    @Override
+    public boolean accept(WorkflowContext context) {
+        ProcessForm processForm = context.getProcessForm();
+        if (!(processForm instanceof GroupResourceProcessForm)) {
+            return false;
+        }
+        GroupResourceProcessForm groupResourceForm = (GroupResourceProcessForm) processForm;
+        return groupResourceForm.getGroupOperateType() == GroupOperateType.RESTART;
+    }
+
     @Override
     public void operateStreamSource(SourceRequest sourceRequest, String operator) {
         streamSourceService.restart(sourceRequest.getId(), operator);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/SourceStopEventSelector.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/SourceStopEventSelector.java
deleted file mode 100644
index 9d01cb017..000000000
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/SourceStopEventSelector.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.source.listener;
-
-import org.apache.inlong.manager.common.enums.GroupOperateType;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
-import org.apache.inlong.manager.workflow.WorkflowContext;
-import org.apache.inlong.manager.workflow.event.EventSelector;
-
-/**
- * Selector of source stop event.
- */
-public class SourceStopEventSelector implements EventSelector {
-
-    @Override
-    public boolean accept(WorkflowContext context) {
-        ProcessForm processForm = context.getProcessForm();
-        if (!(processForm instanceof GroupResourceProcessForm)) {
-            return false;
-        }
-        GroupResourceProcessForm groupResourceProcessForm = (GroupResourceProcessForm) processForm;
-        return groupResourceProcessForm.getGroupOperateType() == GroupOperateType.SUSPEND;
-    }
-}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/SourceStopListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/SourceStopListener.java
index 6203fccca..4d985ba10 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/SourceStopListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/SourceStopListener.java
@@ -17,7 +17,11 @@
 
 package org.apache.inlong.manager.service.source.listener;
 
+import org.apache.inlong.manager.common.enums.GroupOperateType;
 import org.apache.inlong.manager.common.pojo.source.SourceRequest;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
+import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.springframework.stereotype.Component;
 
 /**
@@ -31,6 +35,16 @@ public class SourceStopListener extends AbstractSourceOperateListener {
         return getClass().getSimpleName();
     }
 
+    @Override
+    public boolean accept(WorkflowContext context) {
+        ProcessForm processForm = context.getProcessForm();
+        if (!(processForm instanceof GroupResourceProcessForm)) {
+            return false;
+        }
+        GroupResourceProcessForm groupResourceForm = (GroupResourceProcessForm) processForm;
+        return groupResourceForm.getGroupOperateType() == GroupOperateType.SUSPEND;
+    }
+
     @Override
     public void operateStreamSource(SourceRequest sourceRequest, String operator) {
         streamSourceService.stop(sourceRequest.getId(), operator);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionPassTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionPassTaskListener.java
index c50bafd26..cefa3e7d9 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionPassTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionPassTaskListener.java
@@ -23,8 +23,8 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
 import org.apache.inlong.manager.common.pojo.consumption.ConsumptionInfo;
-import org.apache.inlong.manager.common.pojo.workflow.form.task.ConsumptionApproveForm;
 import org.apache.inlong.manager.common.pojo.workflow.form.process.ApplyConsumptionProcessForm;
+import org.apache.inlong.manager.common.pojo.workflow.form.task.ConsumptionApproveForm;
 import org.apache.inlong.manager.service.core.ConsumptionService;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/CreateGroupWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/CreateGroupWorkflowDefinition.java
index 421bca1c0..b582658ca 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/CreateGroupWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/CreateGroupWorkflowDefinition.java
@@ -18,8 +18,8 @@
 package org.apache.inlong.manager.service.workflow.group;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
 import org.apache.inlong.manager.common.enums.ProcessName;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
 import org.apache.inlong.manager.service.workflow.WorkflowDefinition;
 import org.apache.inlong.manager.service.workflow.group.listener.InitGroupCompleteListener;
 import org.apache.inlong.manager.service.workflow.group.listener.InitGroupFailedListener;
@@ -73,32 +73,32 @@ public class CreateGroupWorkflowDefinition implements WorkflowDefinition {
         ServiceTask initMQTask = new ServiceTask();
         initMQTask.setName("InitMQ");
         initMQTask.setDisplayName("Group-InitMQ");
-        initMQTask.addServiceTaskType(ServiceTaskType.INIT_MQ);
-        initMQTask.addListenerProvider(groupTaskListenerFactory);
+        initMQTask.setServiceTaskType(ServiceTaskType.INIT_MQ);
+        initMQTask.setListenerFactory(groupTaskListenerFactory);
         process.addTask(initMQTask);
 
         // Init Sink
         ServiceTask initSinkTask = new ServiceTask();
         initSinkTask.setName("InitSink");
         initSinkTask.setDisplayName("Group-InitSink");
-        initSinkTask.addServiceTaskType(ServiceTaskType.INIT_SINK);
-        initSinkTask.addListenerProvider(groupTaskListenerFactory);
+        initSinkTask.setServiceTaskType(ServiceTaskType.INIT_SINK);
+        initSinkTask.setListenerFactory(groupTaskListenerFactory);
         process.addTask(initSinkTask);
 
         // Init Sort
         ServiceTask initSortTask = new ServiceTask();
         initSortTask.setName("InitSort");
         initSortTask.setDisplayName("Group-InitSort");
-        initSortTask.addServiceTaskType(ServiceTaskType.INIT_SORT);
-        initSortTask.addListenerProvider(groupTaskListenerFactory);
+        initSortTask.setServiceTaskType(ServiceTaskType.INIT_SORT);
+        initSortTask.setListenerFactory(groupTaskListenerFactory);
         process.addTask(initSortTask);
 
         // Init Source
         ServiceTask initSourceTask = new ServiceTask();
         initSourceTask.setName("InitSource");
         initSourceTask.setDisplayName("Group-InitSource");
-        initSourceTask.addServiceTaskType(ServiceTaskType.INIT_SOURCE);
-        initSourceTask.addListenerProvider(groupTaskListenerFactory);
+        initSourceTask.setServiceTaskType(ServiceTaskType.INIT_SOURCE);
+        initSourceTask.setListenerFactory(groupTaskListenerFactory);
         process.addTask(initSourceTask);
 
         // End node
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/DeleteGroupWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/DeleteGroupWorkflowDefinition.java
index 0bb526bde..3c17ecad3 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/DeleteGroupWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/DeleteGroupWorkflowDefinition.java
@@ -73,24 +73,24 @@ public class DeleteGroupWorkflowDefinition implements WorkflowDefinition {
         ServiceTask deleteSourceTask = new ServiceTask();
         deleteSourceTask.setName("DeleteSource");
         deleteSourceTask.setDisplayName("Group-DeleteSource");
-        deleteSourceTask.addServiceTaskType(ServiceTaskType.DELETE_SOURCE);
-        deleteSourceTask.addListenerProvider(groupTaskListenerFactory);
+        deleteSourceTask.setServiceTaskType(ServiceTaskType.DELETE_SOURCE);
+        deleteSourceTask.setListenerFactory(groupTaskListenerFactory);
         process.addTask(deleteSourceTask);
 
         // Delete MQ
         ServiceTask deleteMQTask = new ServiceTask();
         deleteMQTask.setName("DeleteMQ");
         deleteMQTask.setDisplayName("Group-DeleteMQ");
-        deleteMQTask.addServiceTaskType(ServiceTaskType.DELETE_MQ);
-        deleteMQTask.addListenerProvider(groupTaskListenerFactory);
+        deleteMQTask.setServiceTaskType(ServiceTaskType.DELETE_MQ);
+        deleteMQTask.setListenerFactory(groupTaskListenerFactory);
         process.addTask(deleteMQTask);
 
         // Delete Sort
         ServiceTask deleteSortTask = new ServiceTask();
         deleteSortTask.setName("DeleteSort");
         deleteSortTask.setDisplayName("Group-DeleteSort");
-        deleteSortTask.addServiceTaskType(ServiceTaskType.DELETE_SORT);
-        deleteSortTask.addListenerProvider(groupTaskListenerFactory);
+        deleteSortTask.setServiceTaskType(ServiceTaskType.DELETE_SORT);
+        deleteSortTask.setListenerFactory(groupTaskListenerFactory);
         process.addTask(deleteSortTask);
 
         // End node
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/RestartGroupWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/RestartGroupWorkflowDefinition.java
index a8d1afb27..4449b78e5 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/RestartGroupWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/RestartGroupWorkflowDefinition.java
@@ -73,16 +73,16 @@ public class RestartGroupWorkflowDefinition implements WorkflowDefinition {
         ServiceTask restartSortTask = new ServiceTask();
         restartSortTask.setName("RestartSort");
         restartSortTask.setDisplayName("Group-RestartSort");
-        restartSortTask.addServiceTaskType(ServiceTaskType.RESTART_SORT);
-        restartSortTask.addListenerProvider(taskListenerFactory);
+        restartSortTask.setServiceTaskType(ServiceTaskType.RESTART_SORT);
+        restartSortTask.setListenerFactory(taskListenerFactory);
         process.addTask(restartSortTask);
 
         // Restart Source
         ServiceTask restartSourceTask = new ServiceTask();
         restartSourceTask.setName("RestartSource");
         restartSourceTask.setDisplayName("Group-RestartSource");
-        restartSourceTask.addServiceTaskType(ServiceTaskType.RESTART_SOURCE);
-        restartSourceTask.addListenerProvider(taskListenerFactory);
+        restartSourceTask.setServiceTaskType(ServiceTaskType.RESTART_SOURCE);
+        restartSourceTask.setListenerFactory(taskListenerFactory);
         process.addTask(restartSourceTask);
 
         // End node
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/SuspendGroupWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/SuspendGroupWorkflowDefinition.java
index e59819b3f..d9aaf588e 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/SuspendGroupWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/SuspendGroupWorkflowDefinition.java
@@ -73,16 +73,16 @@ public class SuspendGroupWorkflowDefinition implements WorkflowDefinition {
         ServiceTask stopSourceTask = new ServiceTask();
         stopSourceTask.setName("StopSource");
         stopSourceTask.setDisplayName("Group-StopSource");
-        stopSourceTask.addServiceTaskType(ServiceTaskType.STOP_SOURCE);
-        stopSourceTask.addListenerProvider(groupTaskListenerFactory);
+        stopSourceTask.setServiceTaskType(ServiceTaskType.STOP_SOURCE);
+        stopSourceTask.setListenerFactory(groupTaskListenerFactory);
         process.addTask(stopSourceTask);
 
         // Stop Sort
         ServiceTask stopSortTask = new ServiceTask();
         stopSortTask.setName("StopSort");
         stopSortTask.setDisplayName("Group-StopSort");
-        stopSortTask.addServiceTaskType(ServiceTaskType.STOP_SORT);
-        stopSortTask.addListenerProvider(groupTaskListenerFactory);
+        stopSortTask.setServiceTaskType(ServiceTaskType.STOP_SORT);
+        stopSortTask.setListenerFactory(groupTaskListenerFactory);
         process.addTask(stopSortTask);
 
         // End node
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/InitGroupCompleteListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/InitGroupCompleteListener.java
index aed5385d9..eb8179e67 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/InitGroupCompleteListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/InitGroupCompleteListener.java
@@ -24,7 +24,10 @@ import org.apache.inlong.manager.common.enums.SourceStatus;
 import org.apache.inlong.manager.common.enums.StreamStatus;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.common.pojo.group.InlongGroupRequest;
 import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
+import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
+import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
 import org.apache.inlong.manager.service.core.InlongStreamService;
 import org.apache.inlong.manager.service.group.InlongGroupService;
 import org.apache.inlong.manager.service.source.StreamSourceService;
@@ -48,6 +51,8 @@ public class InitGroupCompleteListener implements ProcessEventListener {
     private InlongStreamService streamService;
     @Autowired
     private StreamSourceService sourceService;
+    @Autowired
+    private InlongGroupEntityMapper groupMapper;
 
     @Override
     public ProcessEvent event() {
@@ -70,7 +75,10 @@ public class InitGroupCompleteListener implements ProcessEventListener {
         InlongGroupInfo groupInfo = form.getGroupInfo();
         String operator = context.getOperator();
         groupService.updateStatus(groupId, GroupStatus.CONFIG_SUCCESSFUL.getCode(), operator);
-        groupService.update(groupInfo.genRequest(), operator);
+        InlongGroupEntity existGroup = groupMapper.selectByGroupId(groupId);
+        InlongGroupRequest updateGroupRequest = groupInfo.genRequest();
+        updateGroupRequest.setVersion(existGroup.getVersion());
+        groupService.update(updateGroupRequest, operator);
 
         // update status of other related configs
         streamService.updateStatus(groupId, null, StreamStatus.CONFIG_SUCCESSFUL.getCode(), operator);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/listener/GroupTaskListenerFactory.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/listener/GroupTaskListenerFactory.java
index 650bef608..552f5e1e9 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/listener/GroupTaskListenerFactory.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/listener/GroupTaskListenerFactory.java
@@ -19,31 +19,23 @@ package org.apache.inlong.manager.service.workflow.listener;
 
 import com.google.common.collect.Lists;
 import lombok.Data;
-import org.apache.commons.collections.MapUtils;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.inlong.manager.service.mq.CreatePulsarGroupTaskListener;
 import org.apache.inlong.manager.service.mq.CreatePulsarResourceTaskListener;
 import org.apache.inlong.manager.service.mq.CreateTubeGroupTaskListener;
 import org.apache.inlong.manager.service.mq.CreateTubeTopicTaskListener;
 import org.apache.inlong.manager.service.mq.DeletePulsarResourceTaskListener;
-import org.apache.inlong.manager.service.mq.PulsarResourceCreateSelector;
-import org.apache.inlong.manager.service.mq.PulsarResourceDeleteSelector;
-import org.apache.inlong.manager.service.mq.TubeEventSelector;
 import org.apache.inlong.manager.service.resource.SinkResourceListener;
 import org.apache.inlong.manager.service.sort.SortConfigListener;
-import org.apache.inlong.manager.service.sort.ZookeeperDisabledSelector;
-import org.apache.inlong.manager.service.source.listener.SourceDeleteEventSelector;
 import org.apache.inlong.manager.service.source.listener.SourceDeleteListener;
-import org.apache.inlong.manager.service.source.listener.SourceRestartEventSelector;
 import org.apache.inlong.manager.service.source.listener.SourceRestartListener;
-import org.apache.inlong.manager.service.source.listener.SourceStopEventSelector;
 import org.apache.inlong.manager.service.source.listener.SourceStopListener;
 import org.apache.inlong.manager.workflow.WorkflowContext;
-import org.apache.inlong.manager.workflow.definition.ServiceTaskListenerProvider;
 import org.apache.inlong.manager.workflow.definition.ServiceTaskType;
-import org.apache.inlong.manager.workflow.event.EventSelector;
-import org.apache.inlong.manager.workflow.event.task.DataSourceOperateListener;
+import org.apache.inlong.manager.workflow.definition.TaskListenerFactory;
 import org.apache.inlong.manager.workflow.event.task.QueueOperateListener;
 import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
+import org.apache.inlong.manager.workflow.event.task.SourceOperateListener;
 import org.apache.inlong.manager.workflow.event.task.TaskEventListener;
 import org.apache.inlong.manager.workflow.plugin.Plugin;
 import org.apache.inlong.manager.workflow.plugin.PluginBinder;
@@ -54,22 +46,19 @@ import org.springframework.stereotype.Component;
 import javax.annotation.PostConstruct;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.LinkedHashMap;
+import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
 
 /**
- * Service task listener factory.
+ * The TaskEventListener factory for InlongGroup.
  */
 @Data
 @Component
-public class GroupTaskListenerFactory implements PluginBinder, ServiceTaskListenerProvider {
+public class GroupTaskListenerFactory implements PluginBinder, TaskListenerFactory {
 
-    private Map<DataSourceOperateListener, EventSelector> sourceOperateListeners;
-
-    private Map<QueueOperateListener, EventSelector> queueOperateListeners;
-
-    private Map<SortOperateListener, EventSelector> sortOperateListeners;
+    private List<SourceOperateListener> sourceOperateListeners;
+    private List<QueueOperateListener> queueOperateListeners;
+    private List<SortOperateListener> sortOperateListeners;
 
     @Autowired
     private SourceStopListener sourceStopListener;
@@ -77,7 +66,6 @@ public class GroupTaskListenerFactory implements PluginBinder, ServiceTaskListen
     private SourceRestartListener sourceRestartListener;
     @Autowired
     private SourceDeleteListener sourceDeleteListener;
-
     @Autowired
     private CreateTubeTopicTaskListener createTubeTopicTaskListener;
     @Autowired
@@ -88,7 +76,6 @@ public class GroupTaskListenerFactory implements PluginBinder, ServiceTaskListen
     private CreatePulsarGroupTaskListener createPulsarGroupTaskListener;
     @Autowired
     private DeletePulsarResourceTaskListener deletePulsarResourceTaskListener;
-
     @Autowired
     private SinkResourceListener sinkResourceListener;
     @Autowired
@@ -96,29 +83,43 @@ public class GroupTaskListenerFactory implements PluginBinder, ServiceTaskListen
 
     @PostConstruct
     public void init() {
-        sourceOperateListeners = new LinkedHashMap<>();
-        sourceOperateListeners.put(sourceStopListener, new SourceStopEventSelector());
-        sourceOperateListeners.put(sourceDeleteListener, new SourceDeleteEventSelector());
-        sourceOperateListeners.put(sourceRestartListener, new SourceRestartEventSelector());
-        queueOperateListeners = new LinkedHashMap<>();
-        queueOperateListeners.put(createTubeTopicTaskListener, new TubeEventSelector());
-        queueOperateListeners.put(createTubeGroupTaskListener, new TubeEventSelector());
-        queueOperateListeners.put(createPulsarResourceTaskListener, new PulsarResourceCreateSelector());
-        queueOperateListeners.put(createPulsarGroupTaskListener, new PulsarResourceCreateSelector());
-        queueOperateListeners.put(deletePulsarResourceTaskListener, new PulsarResourceDeleteSelector());
-        sortOperateListeners = new LinkedHashMap<>();
-        sortOperateListeners.put(sortConfigListener, new ZookeeperDisabledSelector());
+        sourceOperateListeners = new LinkedList<>();
+        sourceOperateListeners.add(sourceStopListener);
+        sourceOperateListeners.add(sourceDeleteListener);
+        sourceOperateListeners.add(sourceRestartListener);
+        queueOperateListeners = new LinkedList<>();
+        queueOperateListeners.add(createTubeTopicTaskListener);
+        queueOperateListeners.add(createTubeGroupTaskListener);
+        queueOperateListeners.add(createPulsarResourceTaskListener);
+        queueOperateListeners.add(createPulsarGroupTaskListener);
+        queueOperateListeners.add(deletePulsarResourceTaskListener);
+        sortOperateListeners = new LinkedList<>();
+        sortOperateListeners.add(sortConfigListener);
     }
 
-    public void clearListeners() {
-        sourceOperateListeners = new LinkedHashMap<>();
-        queueOperateListeners = new LinkedHashMap<>();
-        sortOperateListeners = new LinkedHashMap<>();
+    @Override
+    public void acceptPlugin(Plugin plugin) {
+        if (!(plugin instanceof ProcessPlugin)) {
+            return;
+        }
+        ProcessPlugin processPlugin = (ProcessPlugin) plugin;
+        List<SourceOperateListener> pluginSourceOperateListeners = processPlugin.createSourceOperateListeners();
+        if (CollectionUtils.isNotEmpty(pluginSourceOperateListeners)) {
+            sourceOperateListeners.addAll(processPlugin.createSourceOperateListeners());
+        }
+        List<QueueOperateListener> pluginQueueOperateListeners = processPlugin.createQueueOperateListeners();
+        if (CollectionUtils.isNotEmpty(pluginQueueOperateListeners)) {
+            queueOperateListeners.addAll(pluginQueueOperateListeners);
+        }
+        List<SortOperateListener> pluginSortOperateListeners = processPlugin.createSortOperateListeners();
+        if (CollectionUtils.isNotEmpty(pluginSortOperateListeners)) {
+            sortOperateListeners.addAll(pluginSortOperateListeners);
+        }
     }
 
     @Override
-    public List<TaskEventListener> get(WorkflowContext workflowContext, ServiceTaskType serviceTaskType) {
-        switch (serviceTaskType) {
+    public List<? extends TaskEventListener> get(WorkflowContext workflowContext, ServiceTaskType taskType) {
+        switch (taskType) {
             case INIT_MQ:
             case DELETE_MQ:
                 List<QueueOperateListener> queueOperateListeners = getQueueOperateListener(workflowContext);
@@ -133,24 +134,32 @@ public class GroupTaskListenerFactory implements PluginBinder, ServiceTaskListen
             case STOP_SOURCE:
             case RESTART_SOURCE:
             case DELETE_SOURCE:
-                List<DataSourceOperateListener> sourceOperateListeners = getSourceOperateListener(workflowContext);
+                List<SourceOperateListener> sourceOperateListeners = getSourceOperateListener(workflowContext);
                 return Lists.newArrayList(sourceOperateListeners);
             case INIT_SINK:
                 return Collections.singletonList(sinkResourceListener);
             default:
-                throw new IllegalArgumentException(String.format("UnSupport ServiceTaskType %s", serviceTaskType));
+                throw new IllegalArgumentException(String.format("Unsupported ServiceTaskType %s", taskType));
         }
     }
 
     /**
-     * Get data source operate listener list.
+     * Clear the list of listeners.
+     */
+    public void clearListeners() {
+        sourceOperateListeners = new LinkedList<>();
+        queueOperateListeners = new LinkedList<>();
+        sortOperateListeners = new LinkedList<>();
+    }
+
+    /**
+     * Get stream source operate listener list.
      */
-    public List<DataSourceOperateListener> getSourceOperateListener(WorkflowContext context) {
-        List<DataSourceOperateListener> listeners = new ArrayList<>();
-        for (Map.Entry<DataSourceOperateListener, EventSelector> entry : sourceOperateListeners.entrySet()) {
-            EventSelector selector = entry.getValue();
-            if (selector != null && selector.accept(context)) {
-                listeners.add(entry.getKey());
+    public List<SourceOperateListener> getSourceOperateListener(WorkflowContext context) {
+        List<SourceOperateListener> listeners = new ArrayList<>();
+        for (SourceOperateListener listener : sourceOperateListeners) {
+            if (listener != null && listener.accept(context)) {
+                listeners.add(listener);
             }
         }
         return listeners;
@@ -161,10 +170,9 @@ public class GroupTaskListenerFactory implements PluginBinder, ServiceTaskListen
      */
     public List<QueueOperateListener> getQueueOperateListener(WorkflowContext context) {
         List<QueueOperateListener> listeners = new ArrayList<>();
-        for (Map.Entry<QueueOperateListener, EventSelector> entry : queueOperateListeners.entrySet()) {
-            EventSelector selector = entry.getValue();
-            if (selector != null && selector.accept(context)) {
-                listeners.add(entry.getKey());
+        for (QueueOperateListener listener : queueOperateListeners) {
+            if (listener != null && listener.accept(context)) {
+                listeners.add(listener);
             }
         }
         return listeners;
@@ -175,36 +183,12 @@ public class GroupTaskListenerFactory implements PluginBinder, ServiceTaskListen
      */
     public List<SortOperateListener> getSortOperateListener(WorkflowContext context) {
         List<SortOperateListener> listeners = new ArrayList<>();
-        for (Map.Entry<SortOperateListener, EventSelector> entry : sortOperateListeners.entrySet()) {
-            EventSelector selector = entry.getValue();
-            if (selector != null && selector.accept(context)) {
-                listeners.add(entry.getKey());
+        for (SortOperateListener listener : sortOperateListeners) {
+            if (listener != null && listener.accept(context)) {
+                listeners.add(listener);
             }
         }
         return listeners;
     }
 
-    @Override
-    public void acceptPlugin(Plugin plugin) {
-        if (!(plugin instanceof ProcessPlugin)) {
-            return;
-        }
-        ProcessPlugin processPlugin = (ProcessPlugin) plugin;
-        Map<DataSourceOperateListener, EventSelector> pluginDsOperateListeners =
-                processPlugin.createSourceOperateListeners();
-        if (MapUtils.isNotEmpty(pluginDsOperateListeners)) {
-            sourceOperateListeners.putAll(processPlugin.createSourceOperateListeners());
-        }
-        Map<QueueOperateListener, EventSelector> pluginQueueOperateListeners =
-                processPlugin.createQueueOperateListeners();
-        if (MapUtils.isNotEmpty(pluginQueueOperateListeners)) {
-            queueOperateListeners.putAll(pluginQueueOperateListeners);
-        }
-        Map<SortOperateListener, EventSelector> pluginSortOperateListeners =
-                processPlugin.createSortOperateListeners();
-        if (MapUtils.isNotEmpty(pluginSortOperateListeners)) {
-            sortOperateListeners.putAll(pluginSortOperateListeners);
-        }
-    }
-
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/listener/StreamTaskListenerFactory.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/listener/StreamTaskListenerFactory.java
index f556c9b34..b894817eb 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/listener/StreamTaskListenerFactory.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/listener/StreamTaskListenerFactory.java
@@ -18,26 +18,20 @@
 package org.apache.inlong.manager.service.workflow.listener;
 
 import com.google.common.collect.Lists;
-import lombok.Data;
-import org.apache.commons.collections.MapUtils;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.StreamResourceProcessForm;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.inlong.manager.service.mq.CreatePulsarSubscriptionTaskListener;
 import org.apache.inlong.manager.service.mq.CreatePulsarTopicTaskListener;
 import org.apache.inlong.manager.service.mq.DeletePulsarTopicTaskListener;
-import org.apache.inlong.manager.service.mq.PulsarTopicCreateSelector;
-import org.apache.inlong.manager.service.mq.PulsarTopicDeleteSelector;
 import org.apache.inlong.manager.service.resource.StreamSinkResourceListener;
 import org.apache.inlong.manager.service.sort.StreamSortConfigListener;
-import org.apache.inlong.manager.service.sort.ZookeeperEnabledSelector;
 import org.apache.inlong.manager.workflow.WorkflowContext;
-import org.apache.inlong.manager.workflow.definition.ServiceTaskListenerProvider;
 import org.apache.inlong.manager.workflow.definition.ServiceTaskType;
-import org.apache.inlong.manager.workflow.event.EventSelector;
-import org.apache.inlong.manager.workflow.event.task.DataSourceOperateListener;
+import org.apache.inlong.manager.workflow.definition.TaskListenerFactory;
 import org.apache.inlong.manager.workflow.event.task.QueueOperateListener;
 import org.apache.inlong.manager.workflow.event.task.SinkOperateListener;
 import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
+import org.apache.inlong.manager.workflow.event.task.SourceOperateListener;
+import org.apache.inlong.manager.workflow.event.task.TaskEventListener;
 import org.apache.inlong.manager.workflow.plugin.Plugin;
 import org.apache.inlong.manager.workflow.plugin.PluginBinder;
 import org.apache.inlong.manager.workflow.plugin.ProcessPlugin;
@@ -46,21 +40,19 @@ import org.springframework.stereotype.Component;
 
 import javax.annotation.PostConstruct;
 import java.util.ArrayList;
-import java.util.LinkedHashMap;
+import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
 
-@Data
+/**
+ * The TaskEventListener factory for InlongStream.
+ */
 @Component
-public class StreamTaskListenerFactory implements PluginBinder, ServiceTaskListenerProvider {
-
-    private Map<DataSourceOperateListener, EventSelector> sourceOperateListeners;
-
-    private Map<QueueOperateListener, EventSelector> queueOperateListeners;
-
-    private Map<SortOperateListener, EventSelector> sortOperateListeners;
+public class StreamTaskListenerFactory implements PluginBinder, TaskListenerFactory {
 
-    private Map<SinkOperateListener, EventSelector> sinkOperateListeners;
+    private List<SourceOperateListener> sourceOperateListeners;
+    private List<QueueOperateListener> queueOperateListeners;
+    private List<SortOperateListener> sortOperateListeners;
+    private List<SinkOperateListener> sinkOperateListeners;
 
     @Autowired
     private CreatePulsarTopicTaskListener createPulsarTopicTaskListener;
@@ -75,23 +67,44 @@ public class StreamTaskListenerFactory implements PluginBinder, ServiceTaskListe
 
     @PostConstruct
     public void init() {
-        sourceOperateListeners = new LinkedHashMap<>();
-        queueOperateListeners = new LinkedHashMap<>();
-        queueOperateListeners.put(createPulsarTopicTaskListener, new PulsarTopicCreateSelector());
-        queueOperateListeners.put(createPulsarSubscriptionTaskListener, new PulsarTopicCreateSelector());
-        queueOperateListeners.put(deletePulsarTopicTaskListener, new PulsarTopicDeleteSelector());
-        sortOperateListeners = new LinkedHashMap<>();
-        sortOperateListeners.put(streamSortConfigListener, new ZookeeperEnabledSelector());
-        sinkOperateListeners = new LinkedHashMap<>();
-        sinkOperateListeners.put(sinkResourceListener, context -> {
-            ProcessForm processForm = context.getProcessForm();
-            return processForm instanceof StreamResourceProcessForm;
-        });
+        sourceOperateListeners = new LinkedList<>();
+        queueOperateListeners = new LinkedList<>();
+        queueOperateListeners.add(createPulsarTopicTaskListener);
+        queueOperateListeners.add(createPulsarSubscriptionTaskListener);
+        queueOperateListeners.add(deletePulsarTopicTaskListener);
+        sortOperateListeners = new LinkedList<>();
+        sortOperateListeners.add(streamSortConfigListener);
+        sinkOperateListeners = new LinkedList<>();
+        sinkOperateListeners.add(sinkResourceListener);
+    }
+
+    @Override
+    public void acceptPlugin(Plugin plugin) {
+        if (!(plugin instanceof ProcessPlugin)) {
+            return;
+        }
+        ProcessPlugin processPlugin = (ProcessPlugin) plugin;
+        List<SourceOperateListener> pluginSourceOperateListeners = processPlugin.createSourceOperateListeners();
+        if (CollectionUtils.isNotEmpty(pluginSourceOperateListeners)) {
+            sourceOperateListeners.addAll(processPlugin.createSourceOperateListeners());
+        }
+        List<QueueOperateListener> pluginQueueOperateListeners = processPlugin.createQueueOperateListeners();
+        if (CollectionUtils.isNotEmpty(pluginQueueOperateListeners)) {
+            queueOperateListeners.addAll(pluginQueueOperateListeners);
+        }
+        List<SortOperateListener> pluginSortOperateListeners = processPlugin.createSortOperateListeners();
+        if (CollectionUtils.isNotEmpty(pluginSortOperateListeners)) {
+            sortOperateListeners.addAll(pluginSortOperateListeners);
+        }
+        List<SinkOperateListener> pluginSinkOperateListeners = processPlugin.createSinkOperateListeners();
+        if (CollectionUtils.isNotEmpty(pluginSinkOperateListeners)) {
+            sinkOperateListeners.addAll(pluginSinkOperateListeners);
+        }
     }
 
     @Override
-    public Iterable get(WorkflowContext workflowContext, ServiceTaskType serviceTaskType) {
-        switch (serviceTaskType) {
+    public List<? extends TaskEventListener> get(WorkflowContext workflowContext, ServiceTaskType taskType) {
+        switch (taskType) {
             case INIT_MQ:
             case DELETE_MQ:
                 List<QueueOperateListener> queueOperateListeners = getQueueOperateListener(workflowContext);
@@ -106,25 +119,34 @@ public class StreamTaskListenerFactory implements PluginBinder, ServiceTaskListe
             case STOP_SOURCE:
             case RESTART_SOURCE:
             case DELETE_SOURCE:
-                List<DataSourceOperateListener> sourceOperateListeners = getSourceOperateListener(workflowContext);
+                List<SourceOperateListener> sourceOperateListeners = getSourceOperateListener(workflowContext);
                 return Lists.newArrayList(sourceOperateListeners);
             case INIT_SINK:
                 List<SinkOperateListener> sinkOperateListeners = getSinkOperateListener(workflowContext);
                 return Lists.newArrayList(sinkOperateListeners);
             default:
-                throw new IllegalArgumentException(String.format("UnSupport ServiceTaskType %s", serviceTaskType));
+                throw new IllegalArgumentException(String.format("Unsupported ServiceTaskType %s", taskType));
         }
     }
 
     /**
-     * Get data source operate listener list.
+     * Clear the list of listeners.
      */
-    public List<DataSourceOperateListener> getSourceOperateListener(WorkflowContext context) {
-        List<DataSourceOperateListener> listeners = new ArrayList<>();
-        for (Map.Entry<DataSourceOperateListener, EventSelector> entry : sourceOperateListeners.entrySet()) {
-            EventSelector selector = entry.getValue();
-            if (selector != null && selector.accept(context)) {
-                listeners.add(entry.getKey());
+    public void clearListeners() {
+        sourceOperateListeners = new LinkedList<>();
+        queueOperateListeners = new LinkedList<>();
+        sortOperateListeners = new LinkedList<>();
+        sinkOperateListeners = new LinkedList<>();
+    }
+
+    /**
+     * Get stream source operate listener list.
+     */
+    public List<SourceOperateListener> getSourceOperateListener(WorkflowContext context) {
+        List<SourceOperateListener> listeners = new ArrayList<>();
+        for (SourceOperateListener listener : sourceOperateListeners) {
+            if (listener != null && listener.accept(context)) {
+                listeners.add(listener);
             }
         }
         return listeners;
@@ -135,10 +157,9 @@ public class StreamTaskListenerFactory implements PluginBinder, ServiceTaskListe
      */
     public List<QueueOperateListener> getQueueOperateListener(WorkflowContext context) {
         List<QueueOperateListener> listeners = new ArrayList<>();
-        for (Map.Entry<QueueOperateListener, EventSelector> entry : queueOperateListeners.entrySet()) {
-            EventSelector selector = entry.getValue();
-            if (selector != null && selector.accept(context)) {
-                listeners.add(entry.getKey());
+        for (QueueOperateListener listener : queueOperateListeners) {
+            if (listener != null && listener.accept(context)) {
+                listeners.add(listener);
             }
         }
         return listeners;
@@ -149,10 +170,9 @@ public class StreamTaskListenerFactory implements PluginBinder, ServiceTaskListe
      */
     public List<SortOperateListener> getSortOperateListener(WorkflowContext context) {
         List<SortOperateListener> listeners = new ArrayList<>();
-        for (Map.Entry<SortOperateListener, EventSelector> entry : sortOperateListeners.entrySet()) {
-            EventSelector selector = entry.getValue();
-            if (selector != null && selector.accept(context)) {
-                listeners.add(entry.getKey());
+        for (SortOperateListener listener : sortOperateListeners) {
+            if (listener != null && listener.accept(context)) {
+                listeners.add(listener);
             }
         }
         return listeners;
@@ -161,41 +181,14 @@ public class StreamTaskListenerFactory implements PluginBinder, ServiceTaskListe
     /**
      * Get sink operate listener list.
      */
-    public List<SinkOperateListener> getSinkOperateListener(WorkflowContext context) {
+    private List<SinkOperateListener> getSinkOperateListener(WorkflowContext context) {
         List<SinkOperateListener> listeners = new ArrayList<>();
-        for (Map.Entry<SinkOperateListener, EventSelector> entry : sinkOperateListeners.entrySet()) {
-            EventSelector selector = entry.getValue();
-            if (selector != null && selector.accept(context)) {
-                listeners.add(entry.getKey());
+        for (SinkOperateListener listener : sinkOperateListeners) {
+            if (listener != null && listener.accept(context)) {
+                listeners.add(listener);
             }
         }
         return listeners;
     }
 
-    @Override
-    public void acceptPlugin(Plugin plugin) {
-        if (!(plugin instanceof ProcessPlugin)) {
-            return;
-        }
-        ProcessPlugin processPlugin = (ProcessPlugin) plugin;
-        Map<DataSourceOperateListener, EventSelector> pluginDsOperateListeners =
-                processPlugin.createSourceOperateListeners();
-        if (MapUtils.isNotEmpty(pluginDsOperateListeners)) {
-            sourceOperateListeners.putAll(processPlugin.createSourceOperateListeners());
-        }
-        Map<QueueOperateListener, EventSelector> pluginQueueOperateListeners =
-                processPlugin.createQueueOperateListeners();
-        if (MapUtils.isNotEmpty(pluginQueueOperateListeners)) {
-            queueOperateListeners.putAll(pluginQueueOperateListeners);
-        }
-        Map<SortOperateListener, EventSelector> pluginSortOperateListeners =
-                processPlugin.createSortOperateListeners();
-        if (MapUtils.isNotEmpty(pluginSortOperateListeners)) {
-            sortOperateListeners.putAll(pluginSortOperateListeners);
-        }
-        Map<SinkOperateListener, EventSelector> pluginSinkOperateListeners = processPlugin.createSinkOperateListeners();
-        if (MapUtils.isNotEmpty(pluginSinkOperateListeners)) {
-            sinkOperateListeners.putAll(pluginSinkOperateListeners);
-        }
-    }
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java
index 4afeb82e1..0183448ca 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java
@@ -73,32 +73,32 @@ public class CreateStreamWorkflowDefinition implements WorkflowDefinition {
         ServiceTask initMQTask = new ServiceTask();
         initMQTask.setName("InitMQ");
         initMQTask.setDisplayName("Stream-InitMQ");
-        initMQTask.addServiceTaskType(ServiceTaskType.INIT_MQ);
-        initMQTask.addListenerProvider(streamTaskListenerFactory);
+        initMQTask.setServiceTaskType(ServiceTaskType.INIT_MQ);
+        initMQTask.setListenerFactory(streamTaskListenerFactory);
         process.addTask(initMQTask);
 
         // Init Sink
         ServiceTask initSinkTask = new ServiceTask();
         initSinkTask.setName("InitSink");
         initSinkTask.setDisplayName("Stream-InitSink");
-        initSinkTask.addServiceTaskType(ServiceTaskType.INIT_SINK);
-        initSinkTask.addListenerProvider(streamTaskListenerFactory);
+        initSinkTask.setServiceTaskType(ServiceTaskType.INIT_SINK);
+        initSinkTask.setListenerFactory(streamTaskListenerFactory);
         process.addTask(initSinkTask);
 
         // Init Sort
         ServiceTask initSortTask = new ServiceTask();
         initSortTask.setName("InitSort");
         initSortTask.setDisplayName("Stream-InitSort");
-        initSortTask.addServiceTaskType(ServiceTaskType.INIT_SORT);
-        initSortTask.addListenerProvider(streamTaskListenerFactory);
+        initSortTask.setServiceTaskType(ServiceTaskType.INIT_SORT);
+        initSortTask.setListenerFactory(streamTaskListenerFactory);
         process.addTask(initSortTask);
 
         // Init Source
         ServiceTask initSourceTask = new ServiceTask();
         initSourceTask.setName("InitSource");
         initSourceTask.setDisplayName("Stream-InitSource");
-        initSourceTask.addServiceTaskType(ServiceTaskType.INIT_SOURCE);
-        initSourceTask.addListenerProvider(streamTaskListenerFactory);
+        initSourceTask.setServiceTaskType(ServiceTaskType.INIT_SOURCE);
+        initSourceTask.setListenerFactory(streamTaskListenerFactory);
         process.addTask(initSourceTask);
 
         // End node
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/DeleteStreamWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/DeleteStreamWorkflowDefinition.java
index b5919d907..886f35138 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/DeleteStreamWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/DeleteStreamWorkflowDefinition.java
@@ -73,24 +73,24 @@ public class DeleteStreamWorkflowDefinition implements WorkflowDefinition {
         ServiceTask deleteDataSourceTask = new ServiceTask();
         deleteDataSourceTask.setName("DeleteSource");
         deleteDataSourceTask.setDisplayName("Stream-DeleteSource");
-        deleteDataSourceTask.addServiceTaskType(ServiceTaskType.DELETE_SOURCE);
-        deleteDataSourceTask.addListenerProvider(streamTaskListenerFactory);
+        deleteDataSourceTask.setServiceTaskType(ServiceTaskType.DELETE_SOURCE);
+        deleteDataSourceTask.setListenerFactory(streamTaskListenerFactory);
         process.addTask(deleteDataSourceTask);
 
         // Delete MQ
         ServiceTask deleteMQTask = new ServiceTask();
         deleteMQTask.setName("DeleteMQ");
         deleteMQTask.setDisplayName("Stream-DeleteMQ");
-        deleteMQTask.addServiceTaskType(ServiceTaskType.DELETE_MQ);
-        deleteMQTask.addListenerProvider(streamTaskListenerFactory);
+        deleteMQTask.setServiceTaskType(ServiceTaskType.DELETE_MQ);
+        deleteMQTask.setListenerFactory(streamTaskListenerFactory);
         process.addTask(deleteMQTask);
 
         // Delete Sort
         ServiceTask deleteSortTask = new ServiceTask();
         deleteSortTask.setName("DeleteSort");
         deleteSortTask.setDisplayName("Stream-DeleteSort");
-        deleteSortTask.addServiceTaskType(ServiceTaskType.DELETE_SORT);
-        deleteSortTask.addListenerProvider(streamTaskListenerFactory);
+        deleteSortTask.setServiceTaskType(ServiceTaskType.DELETE_SORT);
+        deleteSortTask.setListenerFactory(streamTaskListenerFactory);
         process.addTask(deleteSortTask);
 
         // End node
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/RestartStreamWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/RestartStreamWorkflowDefinition.java
index 8cd67a943..ca6f8f24b 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/RestartStreamWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/RestartStreamWorkflowDefinition.java
@@ -73,16 +73,16 @@ public class RestartStreamWorkflowDefinition implements WorkflowDefinition {
         ServiceTask restartSortTask = new ServiceTask();
         restartSortTask.setName("RestartSort");
         restartSortTask.setDisplayName("Stream-RestartSort");
-        restartSortTask.addServiceTaskType(ServiceTaskType.RESTART_SORT);
-        restartSortTask.addListenerProvider(streamTaskListenerFactory);
+        restartSortTask.setServiceTaskType(ServiceTaskType.RESTART_SORT);
+        restartSortTask.setListenerFactory(streamTaskListenerFactory);
         process.addTask(restartSortTask);
 
         // Restart Source
         ServiceTask restartDataSourceTask = new ServiceTask();
         restartDataSourceTask.setName("RestartSource");
         restartDataSourceTask.setDisplayName("Stream-RestartSource");
-        restartDataSourceTask.addServiceTaskType(ServiceTaskType.RESTART_SOURCE);
-        restartDataSourceTask.addListenerProvider(streamTaskListenerFactory);
+        restartDataSourceTask.setServiceTaskType(ServiceTaskType.RESTART_SOURCE);
+        restartDataSourceTask.setListenerFactory(streamTaskListenerFactory);
         process.addTask(restartDataSourceTask);
 
         // End node
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/SuspendStreamWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/SuspendStreamWorkflowDefinition.java
index 4a51b57ec..5b0e9992d 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/SuspendStreamWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/SuspendStreamWorkflowDefinition.java
@@ -73,16 +73,16 @@ public class SuspendStreamWorkflowDefinition implements WorkflowDefinition {
         ServiceTask stopDataSourceTask = new ServiceTask();
         stopDataSourceTask.setName("StopSource");
         stopDataSourceTask.setDisplayName("Stream-StopSource");
-        stopDataSourceTask.addServiceTaskType(ServiceTaskType.STOP_SOURCE);
-        stopDataSourceTask.addListenerProvider(streamTaskListenerFactory);
+        stopDataSourceTask.setServiceTaskType(ServiceTaskType.STOP_SOURCE);
+        stopDataSourceTask.setListenerFactory(streamTaskListenerFactory);
         process.addTask(stopDataSourceTask);
 
         // Stop Sort
         ServiceTask stopSortTask = new ServiceTask();
         stopSortTask.setName("StopSort");
         stopSortTask.setDisplayName("Stream-StopSort");
-        stopSortTask.addServiceTaskType(ServiceTaskType.STOP_SORT);
-        stopSortTask.addListenerProvider(streamTaskListenerFactory);
+        stopSortTask.setServiceTaskType(ServiceTaskType.STOP_SORT);
+        stopSortTask.setListenerFactory(streamTaskListenerFactory);
         process.addTask(stopSortTask);
 
         // End node
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/plugin/PluginClassLoaderTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/plugin/PluginClassLoaderTest.java
index e65b18471..a41bcbe4d 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/plugin/PluginClassLoaderTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/plugin/PluginClassLoaderTest.java
@@ -25,8 +25,8 @@ import org.apache.inlong.manager.workflow.plugin.ProcessPlugin;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
-import java.lang.reflect.InvocationTargetException;
 import java.util.Map;
+import java.util.Objects;
 
 /**
  * Test class for load plugin.
@@ -36,7 +36,7 @@ public class PluginClassLoaderTest {
     @Test
     public void testLoadPlugin() {
 
-        String path = this.getClass().getClassLoader().getResource("").getPath();
+        String path = Objects.requireNonNull(this.getClass().getClassLoader().getResource("")).getPath();
         PluginClassLoader pluginClassLoader = PluginClassLoader.getFromPluginUrl(path + "plugins",
                 Thread.currentThread().getContextClassLoader());
         Map<String, PluginDefinition> pluginDefinitionMap = pluginClassLoader.getPluginDefinitions();
@@ -49,11 +49,7 @@ public class PluginClassLoaderTest {
             Class cls = pluginClassLoader.loadClass(pluginClass);
             Plugin plugin = (Plugin) cls.getDeclaredConstructor().newInstance();
             Assertions.assertTrue(plugin instanceof ProcessPlugin);
-        } catch (ClassNotFoundException
-                | NoSuchMethodException
-                | InstantiationException
-                | IllegalAccessException
-                | InvocationTargetException e) {
+        } catch (Exception e) {
             Assertions.assertTrue(e instanceof ClassNotFoundException);
             Assertions.fail();
         }
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/mocks/MockDeleteSortListener.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/mocks/MockDeleteSortListener.java
index db5128672..5836accd7 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/mocks/MockDeleteSortListener.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/mocks/MockDeleteSortListener.java
@@ -17,6 +17,9 @@
 
 package org.apache.inlong.manager.service.mocks;
 
+import org.apache.inlong.manager.common.enums.GroupOperateType;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
 import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
@@ -32,6 +35,16 @@ public class MockDeleteSortListener implements SortOperateListener {
         return TaskEvent.COMPLETE;
     }
 
+    @Override
+    public boolean accept(WorkflowContext context) {
+        ProcessForm processForm = context.getProcessForm();
+        if (!(processForm instanceof GroupResourceProcessForm)) {
+            return false;
+        }
+        GroupResourceProcessForm form = (GroupResourceProcessForm) processForm;
+        return form.getGroupOperateType() == GroupOperateType.DELETE;
+    }
+
     @Override
     public ListenerResult listen(WorkflowContext context) {
         return ListenerResult.success("Mock delete sort success");
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/mocks/MockDeleteSourceListener.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/mocks/MockDeleteSourceListener.java
index b4bd8f2f7..60d3e2b07 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/mocks/MockDeleteSourceListener.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/mocks/MockDeleteSourceListener.java
@@ -17,21 +17,34 @@
 
 package org.apache.inlong.manager.service.mocks;
 
+import org.apache.inlong.manager.common.enums.GroupOperateType;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
-import org.apache.inlong.manager.workflow.event.task.DataSourceOperateListener;
+import org.apache.inlong.manager.workflow.event.task.SourceOperateListener;
 import org.apache.inlong.manager.workflow.event.task.TaskEvent;
 
 /**
  * Test class for listen delete source event.
  */
-public class MockDeleteSourceListener implements DataSourceOperateListener {
+public class MockDeleteSourceListener implements SourceOperateListener {
 
     @Override
     public TaskEvent event() {
         return TaskEvent.COMPLETE;
     }
 
+    @Override
+    public boolean accept(WorkflowContext context) {
+        ProcessForm processForm = context.getProcessForm();
+        if (!(processForm instanceof GroupResourceProcessForm)) {
+            return false;
+        }
+        GroupResourceProcessForm form = (GroupResourceProcessForm) processForm;
+        return form.getGroupOperateType() == GroupOperateType.DELETE;
+    }
+
     @Override
     public ListenerResult listen(WorkflowContext context) {
         return ListenerResult.success("Mock delete source success");
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/mocks/MockPlugin.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/mocks/MockPlugin.java
index dc8442ea8..a1ac977e6 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/mocks/MockPlugin.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/mocks/MockPlugin.java
@@ -17,74 +17,33 @@
 
 package org.apache.inlong.manager.service.mocks;
 
-import org.apache.inlong.manager.common.enums.GroupOperateType;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
-import org.apache.inlong.manager.workflow.WorkflowContext;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
-import org.apache.inlong.manager.workflow.event.EventSelector;
-import org.apache.inlong.manager.workflow.event.task.DataSourceOperateListener;
 import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
+import org.apache.inlong.manager.workflow.event.task.SourceOperateListener;
 import org.apache.inlong.manager.workflow.plugin.ProcessPlugin;
 
-import java.util.HashMap;
-import java.util.Map;
+import java.util.ArrayList;
+import java.util.List;
 
 /**
  * Test class for process plugin.
  */
 public class MockPlugin implements ProcessPlugin {
 
-    public EventSelector stopProcessSelector = new EventSelector() {
-        @Override
-        public boolean accept(WorkflowContext context) {
-            ProcessForm processForm = context.getProcessForm();
-            if (!(processForm instanceof GroupResourceProcessForm)) {
-                return false;
-            }
-            GroupResourceProcessForm form = (GroupResourceProcessForm) processForm;
-            return form.getGroupOperateType() == GroupOperateType.SUSPEND;
-        }
-    };
-
-    public EventSelector restartProcessSelector = new EventSelector() {
-        @Override
-        public boolean accept(WorkflowContext context) {
-            ProcessForm processForm = context.getProcessForm();
-            if (!(processForm instanceof GroupResourceProcessForm)) {
-                return false;
-            }
-            GroupResourceProcessForm form = (GroupResourceProcessForm) processForm;
-            return form.getGroupOperateType() == GroupOperateType.RESTART;
-        }
-    };
-
-    public EventSelector deleteProcessSelector = new EventSelector() {
-        @Override
-        public boolean accept(WorkflowContext context) {
-            ProcessForm processForm = context.getProcessForm();
-            if (!(processForm instanceof GroupResourceProcessForm)) {
-                return false;
-            }
-            GroupResourceProcessForm form = (GroupResourceProcessForm) processForm;
-            return form.getGroupOperateType() == GroupOperateType.DELETE;
-        }
-    };
-
     @Override
-    public Map<DataSourceOperateListener, EventSelector> createSourceOperateListeners() {
-        Map<DataSourceOperateListener, EventSelector> listeners = new HashMap<>();
-        listeners.put(new MockDeleteSourceListener(), deleteProcessSelector);
-        listeners.put(new MockRestartSourceListener(), restartProcessSelector);
-        listeners.put(new MockStopSourceListener(), stopProcessSelector);
+    public List<SourceOperateListener> createSourceOperateListeners() {
+        List<SourceOperateListener> listeners = new ArrayList<>();
+        listeners.add(new MockDeleteSourceListener());
+        listeners.add(new MockRestartSourceListener());
+        listeners.add(new MockStopSourceListener());
         return listeners;
     }
 
     @Override
-    public Map<SortOperateListener, EventSelector> createSortOperateListeners() {
-        Map<SortOperateListener, EventSelector> listeners = new HashMap<>();
-        listeners.put(new MockDeleteSortListener(), deleteProcessSelector);
-        listeners.put(new MockRestartSortListener(), restartProcessSelector);
-        listeners.put(new MockStopSortListener(), stopProcessSelector);
+    public List<SortOperateListener> createSortOperateListeners() {
+        List<SortOperateListener> listeners = new ArrayList<>();
+        listeners.add(new MockDeleteSortListener());
+        listeners.add(new MockRestartSortListener());
+        listeners.add(new MockStopSortListener());
         return listeners;
     }
 
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/mocks/MockRestartSortListener.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/mocks/MockRestartSortListener.java
index 196b10777..8972ab1d7 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/mocks/MockRestartSortListener.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/mocks/MockRestartSortListener.java
@@ -17,6 +17,9 @@
 
 package org.apache.inlong.manager.service.mocks;
 
+import org.apache.inlong.manager.common.enums.GroupOperateType;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
 import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
@@ -32,6 +35,16 @@ public class MockRestartSortListener implements SortOperateListener {
         return TaskEvent.COMPLETE;
     }
 
+    @Override
+    public boolean accept(WorkflowContext context) {
+        ProcessForm processForm = context.getProcessForm();
+        if (!(processForm instanceof GroupResourceProcessForm)) {
+            return false;
+        }
+        GroupResourceProcessForm form = (GroupResourceProcessForm) processForm;
+        return form.getGroupOperateType() == GroupOperateType.RESTART;
+    }
+
     @Override
     public ListenerResult listen(WorkflowContext context) {
         return ListenerResult.success("Mock restart sort success");
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/mocks/MockRestartSourceListener.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/mocks/MockRestartSourceListener.java
index 3ab440090..a6fa79114 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/mocks/MockRestartSourceListener.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/mocks/MockRestartSourceListener.java
@@ -17,21 +17,34 @@
 
 package org.apache.inlong.manager.service.mocks;
 
+import org.apache.inlong.manager.common.enums.GroupOperateType;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
-import org.apache.inlong.manager.workflow.event.task.DataSourceOperateListener;
+import org.apache.inlong.manager.workflow.event.task.SourceOperateListener;
 import org.apache.inlong.manager.workflow.event.task.TaskEvent;
 
 /**
  * Test class for listen restart source event.
  */
-public class MockRestartSourceListener implements DataSourceOperateListener {
+public class MockRestartSourceListener implements SourceOperateListener {
 
     @Override
     public TaskEvent event() {
         return TaskEvent.COMPLETE;
     }
 
+    @Override
+    public boolean accept(WorkflowContext context) {
+        ProcessForm processForm = context.getProcessForm();
+        if (!(processForm instanceof GroupResourceProcessForm)) {
+            return false;
+        }
+        GroupResourceProcessForm form = (GroupResourceProcessForm) processForm;
+        return form.getGroupOperateType() == GroupOperateType.RESTART;
+    }
+
     @Override
     public ListenerResult listen(WorkflowContext context) {
         return ListenerResult.success("Mock restart source success");
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/mocks/MockStopSortListener.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/mocks/MockStopSortListener.java
index 496efdc4b..f485c69bf 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/mocks/MockStopSortListener.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/mocks/MockStopSortListener.java
@@ -17,6 +17,9 @@
 
 package org.apache.inlong.manager.service.mocks;
 
+import org.apache.inlong.manager.common.enums.GroupOperateType;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
 import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
@@ -32,6 +35,16 @@ public class MockStopSortListener implements SortOperateListener {
         return TaskEvent.COMPLETE;
     }
 
+    @Override
+    public boolean accept(WorkflowContext context) {
+        ProcessForm processForm = context.getProcessForm();
+        if (!(processForm instanceof GroupResourceProcessForm)) {
+            return false;
+        }
+        GroupResourceProcessForm form = (GroupResourceProcessForm) processForm;
+        return form.getGroupOperateType() == GroupOperateType.SUSPEND;
+    }
+
     @Override
     public ListenerResult listen(WorkflowContext context) {
         return ListenerResult.success("Mock stop sort success");
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/mocks/MockStopSourceListener.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/mocks/MockStopSourceListener.java
index 54fec8121..ae554323d 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/mocks/MockStopSourceListener.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/mocks/MockStopSourceListener.java
@@ -17,21 +17,34 @@
 
 package org.apache.inlong.manager.service.mocks;
 
+import org.apache.inlong.manager.common.enums.GroupOperateType;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
-import org.apache.inlong.manager.workflow.event.task.DataSourceOperateListener;
+import org.apache.inlong.manager.workflow.event.task.SourceOperateListener;
 import org.apache.inlong.manager.workflow.event.task.TaskEvent;
 
 /**
  * Test class for listen stop source event.
  */
-public class MockStopSourceListener implements DataSourceOperateListener {
+public class MockStopSourceListener implements SourceOperateListener {
 
     @Override
     public TaskEvent event() {
         return TaskEvent.COMPLETE;
     }
 
+    @Override
+    public boolean accept(WorkflowContext context) {
+        ProcessForm processForm = context.getProcessForm();
+        if (!(processForm instanceof GroupResourceProcessForm)) {
+            return false;
+        }
+        GroupResourceProcessForm form = (GroupResourceProcessForm) processForm;
+        return form.getGroupOperateType() == GroupOperateType.SUSPEND;
+    }
+
     @Override
     public ListenerResult listen(WorkflowContext context) {
         return ListenerResult.success("Mock stop source success");
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/GroupTaskListenerFactoryTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/GroupTaskListenerFactoryTest.java
index 33d5ac7ad..46e7a937b 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/GroupTaskListenerFactoryTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/GroupTaskListenerFactoryTest.java
@@ -41,7 +41,7 @@ import java.util.List;
 public class GroupTaskListenerFactoryTest extends ServiceBaseTest {
 
     @Autowired
-    GroupTaskListenerFactory groupTaskListenerFactory;
+    private GroupTaskListenerFactory groupTaskListenerFactory;
 
     @Test
     public void testGetQueueOperateListener() {
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImplTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImplTest.java
index 5070d3ac5..416fd171d 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImplTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImplTest.java
@@ -44,14 +44,11 @@ import org.apache.inlong.manager.workflow.definition.WorkflowTask;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
 import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
 import org.apache.inlong.manager.workflow.event.task.TaskEvent;
-import org.apache.inlong.manager.workflow.event.task.TaskEventListener;
 import org.apache.inlong.manager.workflow.util.WorkflowBeanUtils;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.springframework.beans.factory.annotation.Autowired;
 
-import java.util.List;
-
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -103,31 +100,29 @@ public class WorkflowServiceImplTest extends ServiceBaseTest {
      * Mock the task listener factory
      */
     public void mockTaskListenerFactory() {
-        CreateTubeGroupTaskListener createTubeGroupTaskListener = mock(CreateTubeGroupTaskListener.class);
-        when(createTubeGroupTaskListener.listen(any(WorkflowContext.class))).thenReturn(ListenerResult.success());
-        when(createTubeGroupTaskListener.name()).thenReturn(SinkResourceListener.class.getSimpleName());
-        when(createTubeGroupTaskListener.event()).thenReturn(TaskEvent.COMPLETE);
-        taskListenerFactory.setCreateTubeGroupTaskListener(createTubeGroupTaskListener);
-
-        CreateTubeTopicTaskListener createTubeTopicTaskListener = mock(CreateTubeTopicTaskListener.class);
-        when(createTubeTopicTaskListener.listen(any(WorkflowContext.class))).thenReturn(ListenerResult.success());
-        when(createTubeTopicTaskListener.name()).thenReturn(CreateTubeTopicTaskListener.class.getSimpleName());
-        when(createTubeTopicTaskListener.event()).thenReturn(TaskEvent.COMPLETE);
-        taskListenerFactory.setCreateTubeTopicTaskListener(createTubeTopicTaskListener);
-
-        CreatePulsarResourceTaskListener createPulsarResourceTaskListener = mock(
-                CreatePulsarResourceTaskListener.class);
-        when(createPulsarResourceTaskListener.listen(any(WorkflowContext.class))).thenReturn(ListenerResult.success());
-        when(createPulsarResourceTaskListener.name()).thenReturn(
-                CreatePulsarResourceTaskListener.class.getSimpleName());
-        when(createPulsarResourceTaskListener.event()).thenReturn(TaskEvent.COMPLETE);
-        taskListenerFactory.setCreatePulsarResourceTaskListener(createPulsarResourceTaskListener);
-
-        CreatePulsarGroupTaskListener createPulsarGroupTaskListener = mock(CreatePulsarGroupTaskListener.class);
-        when(createPulsarGroupTaskListener.listen(any(WorkflowContext.class))).thenReturn(ListenerResult.success());
-        when(createPulsarGroupTaskListener.name()).thenReturn(CreatePulsarGroupTaskListener.class.getSimpleName());
-        when(createPulsarGroupTaskListener.event()).thenReturn(TaskEvent.COMPLETE);
-        taskListenerFactory.setCreatePulsarGroupTaskListener(createPulsarGroupTaskListener);
+        CreateTubeGroupTaskListener createTubeGroupListener = mock(CreateTubeGroupTaskListener.class);
+        when(createTubeGroupListener.listen(any(WorkflowContext.class))).thenReturn(ListenerResult.success());
+        when(createTubeGroupListener.name()).thenReturn(SinkResourceListener.class.getSimpleName());
+        when(createTubeGroupListener.event()).thenReturn(TaskEvent.COMPLETE);
+        taskListenerFactory.setCreateTubeGroupTaskListener(createTubeGroupListener);
+
+        CreateTubeTopicTaskListener createTubeTopicListener = mock(CreateTubeTopicTaskListener.class);
+        when(createTubeTopicListener.listen(any(WorkflowContext.class))).thenReturn(ListenerResult.success());
+        when(createTubeTopicListener.name()).thenReturn(CreateTubeTopicTaskListener.class.getSimpleName());
+        when(createTubeTopicListener.event()).thenReturn(TaskEvent.COMPLETE);
+        taskListenerFactory.setCreateTubeTopicTaskListener(createTubeTopicListener);
+
+        CreatePulsarResourceTaskListener createPulsarResourceListener = mock(CreatePulsarResourceTaskListener.class);
+        when(createPulsarResourceListener.listen(any(WorkflowContext.class))).thenReturn(ListenerResult.success());
+        when(createPulsarResourceListener.name()).thenReturn(CreatePulsarResourceTaskListener.class.getSimpleName());
+        when(createPulsarResourceListener.event()).thenReturn(TaskEvent.COMPLETE);
+        taskListenerFactory.setCreatePulsarResourceTaskListener(createPulsarResourceListener);
+
+        CreatePulsarGroupTaskListener createPulsarGroupListener = mock(CreatePulsarGroupTaskListener.class);
+        when(createPulsarGroupListener.listen(any(WorkflowContext.class))).thenReturn(ListenerResult.success());
+        when(createPulsarGroupListener.name()).thenReturn(CreatePulsarGroupTaskListener.class.getSimpleName());
+        when(createPulsarGroupListener.event()).thenReturn(TaskEvent.COMPLETE);
+        taskListenerFactory.setCreatePulsarGroupTaskListener(createPulsarGroupListener);
 
         SinkResourceListener sinkResourceListener = mock(SinkResourceListener.class);
         when(sinkResourceListener.listen(any(WorkflowContext.class))).thenReturn(ListenerResult.success());
@@ -138,7 +133,7 @@ public class WorkflowServiceImplTest extends ServiceBaseTest {
         taskListenerFactory.clearListeners();
         taskListenerFactory.init();
         SortOperateListener mockOperateListener = createMockSortListener();
-        taskListenerFactory.getSortOperateListeners().put(mockOperateListener, context -> true);
+        taskListenerFactory.getSortOperateListeners().add(mockOperateListener);
     }
 
     public SortOperateListener createMockSortListener() {
@@ -169,11 +164,6 @@ public class WorkflowServiceImplTest extends ServiceBaseTest {
         WorkflowProcess process = context.getProcess();
         WorkflowTask task = process.getTaskByName("InitMQ");
         Assertions.assertTrue(task instanceof ServiceTask);
-        Assertions.assertEquals(2, task.getNameToListenerMap().size());
-
-        List<TaskEventListener> listeners = Lists.newArrayList(task.getNameToListenerMap().values());
-        Assertions.assertTrue(listeners.get(0) instanceof CreatePulsarGroupTaskListener);
-        Assertions.assertTrue(listeners.get(1) instanceof CreatePulsarResourceTaskListener);
 
         // Integer processId = processResponse.getId();
         // context = processService.continueProcess(processId, applicant, "continue process");
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/group/CreateGroupWorkflowDefinitionTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/group/CreateGroupWorkflowDefinitionTest.java
index e428fbe1c..31c74a2cb 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/group/CreateGroupWorkflowDefinitionTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/group/CreateGroupWorkflowDefinitionTest.java
@@ -24,7 +24,7 @@ import org.junit.jupiter.api.Test;
 import org.springframework.beans.factory.annotation.Autowired;
 
 /**
- * Test class for ldefine process.
+ * Test class for workflow define process.
  */
 public class CreateGroupWorkflowDefinitionTest extends ServiceBaseTest {
 
diff --git a/inlong-manager/manager-service/src/test/resources/log4j2.xml b/inlong-manager/manager-service/src/test/resources/log4j2.xml
new file mode 100644
index 000000000..a6a161d3f
--- /dev/null
+++ b/inlong-manager/manager-service/src/test/resources/log4j2.xml
@@ -0,0 +1,46 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements. See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership. The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License. You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+<configuration status="WARN" monitorInterval="30">
+    <Properties>
+        <property name="basePath">logs</property>
+        <property name="log_pattern">%d{yyyy-MM-dd HH:mm:ss.SSS} -%5p [%20.20t] %-30.30C{1.}:%L - %m%n</property>
+        <property name="output_log_level">DEBUG</property>
+        <property name="all_fileName">${basePath}/manager-ut.log</property>
+        <property name="console_print_level">DEBUG</property>
+    </Properties>
+
+    <appenders>
+        <Console name="Console" target="SYSTEM_OUT">
+            <ThresholdFilter level="${console_print_level}" onMatch="ACCEPT" onMismatch="DENY"/>
+            <PatternLayout pattern="${log_pattern}"/>
+            <follow>true</follow>
+        </Console>
+        <File name="AllFile" fileName="${all_fileName}">
+            <PatternLayout pattern="${log_pattern}"/>
+        </File>
+    </appenders>
+
+    <loggers>
+        <root level="${output_log_level}">
+            <appender-ref ref="Console"/>
+            <appender-ref ref="AllFile"/>
+        </root>
+    </loggers>
+</configuration>
\ No newline at end of file
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/ServiceTask.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/ServiceTask.java
index 91c1060bf..8b784fa30 100644
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/ServiceTask.java
+++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/ServiceTask.java
@@ -20,8 +20,6 @@ package org.apache.inlong.manager.workflow.definition;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.common.exceptions.WorkflowException;
@@ -45,14 +43,12 @@ import java.util.stream.Collectors;
 @Slf4j
 public class ServiceTask extends WorkflowTask {
 
-    public static final Gson GSON = new GsonBuilder().create();
-
     private static final Set<WorkflowAction> SUPPORTED_ACTIONS = ImmutableSet
             .of(WorkflowAction.COMPLETE, WorkflowAction.CANCEL, WorkflowAction.TERMINATE);
 
     private final AtomicBoolean isInit = new AtomicBoolean(false);
-    private ServiceTaskListenerProvider<TaskEventListener> listenerProvider;
-    private ServiceTaskType serviceTaskType;
+    private TaskListenerFactory listenerFactory;
+    private ServiceTaskType taskType;
 
     @Override
     public WorkflowAction defaultNextAction() {
@@ -91,14 +87,14 @@ public class ServiceTask extends WorkflowTask {
         }
     }
 
-    @SneakyThrows
     @Override
+    @SneakyThrows
     public ServiceTask clone() {
         ServiceTask serviceTask = new ServiceTask();
         serviceTask.setName(this.getName());
         serviceTask.setDisplayName(this.getDisplayName());
-        serviceTask.addServiceTaskType(this.serviceTaskType);
-        serviceTask.addListenerProvider(this.listenerProvider);
+        serviceTask.setServiceTaskType(this.taskType);
+        serviceTask.setListenerFactory(this.listenerFactory);
         Map<WorkflowAction, List<ConditionNextElement>> cloneActionToNextElementMap = Maps.newHashMap();
         this.getActionToNextElementMap().forEach(
                 (k, v) -> cloneActionToNextElementMap.put(k, v.stream().map(ele -> {
@@ -113,27 +109,35 @@ public class ServiceTask extends WorkflowTask {
         return serviceTask;
     }
 
-    public void addListenerProvider(ServiceTaskListenerProvider<TaskEventListener> provider) {
-        this.listenerProvider = provider;
+    /**
+     * Set the listener factory for the Service Task.
+     */
+    public void setListenerFactory(TaskListenerFactory factory) {
+        this.listenerFactory = factory;
     }
 
-    public void addServiceTaskType(ServiceTaskType type) {
-        this.serviceTaskType = type;
+    /**
+     * Set the task type for the Service Task.
+     */
+    public void setServiceTaskType(ServiceTaskType type) {
+        this.taskType = type;
     }
 
+    /**
+     * Init the listeners for current Service Task.
+     */
     public void initListeners(WorkflowContext workflowContext) {
         if (isInit.compareAndSet(false, true)) {
-            if (listenerProvider == null || serviceTaskType == null) {
+            if (listenerFactory == null || taskType == null) {
                 return;
             }
-            List<TaskEventListener> listeners = Lists.newArrayList(
-                    listenerProvider.get(workflowContext, serviceTaskType));
-
+            List<TaskEventListener> listeners = Lists.newArrayList(listenerFactory.get(workflowContext, taskType));
             List<String> listenerNames = listeners.stream().map(EventListener::name).collect(Collectors.toList());
-            log.info("ServiceTask:{} is init for listeners:{}", getName(), GSON.toJson(listenerNames));
+            log.info("ServiceTask: [{}] is init for listeners: {}", getName(), listenerNames);
             addListeners(listeners);
         } else {
-            log.info("ServiceTask:{} is already init", getName());
+            log.warn("ServiceTask [{}] was already init", getName());
         }
     }
+
 }
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/ServiceTaskListenerProvider.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/TaskListenerFactory.java
similarity index 68%
rename from inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/ServiceTaskListenerProvider.java
rename to inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/TaskListenerFactory.java
index e93c2e4f9..6966b9cc0 100644
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/ServiceTaskListenerProvider.java
+++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/TaskListenerFactory.java
@@ -20,11 +20,20 @@ package org.apache.inlong.manager.workflow.definition;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.task.TaskEventListener;
 
+import java.util.List;
+
 /**
- * An object capable of providing instances of TaskEventListener
+ * The factory interface of TaskEventListener.
  */
-public interface ServiceTaskListenerProvider<T extends TaskEventListener> {
+public interface TaskListenerFactory {
 
-    Iterable<T> get(WorkflowContext workflowContext, ServiceTaskType serviceTaskType);
+    /**
+     * Get the task event listeners from the given workflow context and the specified task type.
+     *
+     * @param workflowContext the workflow context
+     * @param taskType the task type
+     * @return list of the task event listeners
+     */
+    List<? extends TaskEventListener> get(WorkflowContext workflowContext, ServiceTaskType taskType);
 
 }
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/WorkflowTask.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/WorkflowTask.java
index 861e07f27..0029d3b5f 100644
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/WorkflowTask.java
+++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/WorkflowTask.java
@@ -59,7 +59,7 @@ public abstract class WorkflowTask extends NextableElement {
      * Get sync task event listener list.
      */
     public List<TaskEventListener> listeners(TaskEvent taskEvent) {
-        return this.listeners.getOrDefault(taskEvent, TaskEventListener.EMPTY_LIST);
+        return this.listeners.getOrDefault(taskEvent, TaskEventListener.EMPTY_LISTENERS);
     }
 
     /**
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/EventSelector.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/EventSelector.java
deleted file mode 100644
index 12ec1b80f..000000000
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/EventSelector.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.workflow.event;
-
-import org.apache.inlong.manager.workflow.WorkflowContext;
-
-/**
- * a selector allowing to decide which event is selected
- */
-public interface EventSelector {
-
-    EventSelector SELECT_ANY = context -> true;
-
-    boolean accept(WorkflowContext context);
-
-}
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/task/QueueOperateListener.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/task/QueueOperateListener.java
index 5df99e3cd..3e8f28475 100644
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/task/QueueOperateListener.java
+++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/task/QueueOperateListener.java
@@ -21,11 +21,12 @@ import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
 
 /**
- * Listener of operate queue.
+ * Listener of operate Message Queue.
  */
 public interface QueueOperateListener extends TaskEventListener {
 
     QueueOperateListener DEFAULT_QUEUE_OPERATE_LISTENER = new QueueOperateListener() {
+
         @Override
         public TaskEvent event() {
             return TaskEvent.COMPLETE;
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/task/SinkOperateListener.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/task/SinkOperateListener.java
index 9b6a7a2bc..2a3b711c6 100644
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/task/SinkOperateListener.java
+++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/task/SinkOperateListener.java
@@ -21,11 +21,12 @@ import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
 
 /**
- * Listener of operate sink.
+ * Listener of operate Sink.
  */
 public interface SinkOperateListener extends TaskEventListener {
 
     SinkOperateListener DEFAULT_SINK_OPERATE_LISTENER = new SinkOperateListener() {
+
         @Override
         public TaskEvent event() {
             return TaskEvent.COMPLETE;
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/task/SortOperateListener.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/task/SortOperateListener.java
index 23b4585e2..3665069e0 100644
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/task/SortOperateListener.java
+++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/task/SortOperateListener.java
@@ -21,11 +21,12 @@ import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
 
 /**
- * Listener of operate sort.
+ * Listener of operate Sort.
  */
 public interface SortOperateListener extends TaskEventListener {
 
     SortOperateListener DEFAULT_SORT_OPERATE_LISTENER = new SortOperateListener() {
+
         @Override
         public TaskEvent event() {
             return TaskEvent.COMPLETE;
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/task/DataSourceOperateListener.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/task/SourceOperateListener.java
similarity index 85%
rename from inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/task/DataSourceOperateListener.java
rename to inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/task/SourceOperateListener.java
index 69d7e4c79..8abb11f5c 100644
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/task/DataSourceOperateListener.java
+++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/task/SourceOperateListener.java
@@ -21,11 +21,12 @@ import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
 
 /**
- * Listener of operate data source.
+ * Listener of operate Source.
  */
-public interface DataSourceOperateListener extends TaskEventListener {
+public interface SourceOperateListener extends TaskEventListener {
+
+    SourceOperateListener DEFAULT_SOURCE_OPERATE_LISTENER = new SourceOperateListener() {
 
-    DataSourceOperateListener DEFAULT_SOURCE_OPERATE_LISTENER = new DataSourceOperateListener() {
         @Override
         public TaskEvent event() {
             return TaskEvent.COMPLETE;
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/task/TaskEventListener.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/task/TaskEventListener.java
index 179328c91..36347531c 100644
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/task/TaskEventListener.java
+++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/task/TaskEventListener.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.manager.workflow.event.task;
 
 import com.google.common.collect.Lists;
+import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.EventListener;
 
 import java.util.List;
@@ -27,6 +28,19 @@ import java.util.List;
  */
 public interface TaskEventListener extends EventListener<TaskEvent> {
 
-    List<TaskEventListener> EMPTY_LIST = Lists.newArrayList();
+    /**
+     * Empty event listeners.
+     */
+    List<TaskEventListener> EMPTY_LISTENERS = Lists.newArrayList();
+
+    /**
+     * Whether the current listener needs to operate the workflow.
+     *
+     * @param context workflow context
+     * @return true if the current listener needs to operate the workflow, false if not
+     */
+    default boolean accept(WorkflowContext context) {
+        return true;
+    }
 
 }
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/ProcessPlugin.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/ProcessPlugin.java
index 91606db4e..e771db27f 100644
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/ProcessPlugin.java
+++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/ProcessPlugin.java
@@ -17,32 +17,31 @@
 
 package org.apache.inlong.manager.workflow.plugin;
 
-import org.apache.inlong.manager.workflow.event.EventSelector;
-import org.apache.inlong.manager.workflow.event.task.DataSourceOperateListener;
 import org.apache.inlong.manager.workflow.event.task.QueueOperateListener;
-import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
 import org.apache.inlong.manager.workflow.event.task.SinkOperateListener;
+import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
+import org.apache.inlong.manager.workflow.event.task.SourceOperateListener;
 
-import java.util.Map;
+import java.util.List;
 
 /**
  * Interface of process plugin.
  */
 public interface ProcessPlugin extends Plugin {
 
-    default Map<DataSourceOperateListener, EventSelector> createSourceOperateListeners() {
+    default List<SourceOperateListener> createSourceOperateListeners() {
         return null;
     }
 
-    default Map<SinkOperateListener, EventSelector> createSinkOperateListeners() {
+    default List<SinkOperateListener> createSinkOperateListeners() {
         return null;
     }
 
-    default Map<QueueOperateListener, EventSelector> createQueueOperateListeners() {
+    default List<QueueOperateListener> createQueueOperateListeners() {
         return null;
     }
 
-    default Map<SortOperateListener, EventSelector> createSortOperateListeners() {
+    default List<SortOperateListener> createSortOperateListeners() {
         return null;
     }