You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/05/07 13:00:29 UTC

[incubator-inlong] branch master updated: [INLONG-4105][Manager] Refactor the sink workflow and sink resource operator (#4106)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new a9a236ffd [INLONG-4105][Manager] Refactor the sink workflow and sink resource operator (#4106)
a9a236ffd is described below

commit a9a236ffd83ae2aaaf71f22f0febbea5bf05d271
Author: healchow <he...@gmail.com>
AuthorDate: Sat May 7 21:00:24 2022 +0800

    [INLONG-4105][Manager] Refactor the sink workflow and sink resource operator (#4106)
---
 ...urceListener.java => SinkResourceListener.java} |  6 +-
 .../service/resource/SinkResourceOperator.java     | 21 +++++-
 .../resource/hive/HiveResourceOperator.java        |  2 +-
 .../resource/hive/HiveSinkEventSelector.java       | 75 ----------------------
 .../resource/kafka/KafkaResourceOperator.java      | 67 +++++++++++--------
 .../workflow/ServiceTaskListenerFactory.java       | 46 ++-----------
 .../resource/hive/HiveSinkEventSelectorTest.java   | 54 ----------------
 .../service/workflow/WorkflowServiceImplTest.java  | 14 ++--
 .../manager/workflow/definition/ServiceTask.java   | 16 ++---
 9 files changed, 84 insertions(+), 217 deletions(-)

diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/CreateSinkResourceListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/SinkResourceListener.java
similarity index 95%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/CreateSinkResourceListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/SinkResourceListener.java
index 5f562f32c..f1d9feee9 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/CreateSinkResourceListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/SinkResourceListener.java
@@ -37,11 +37,11 @@ import java.util.List;
 import java.util.stream.Collectors;
 
 /**
- * Event listener of create sink resources, such as Hive table, Kafka topics, ES indices, etc.
+ * Event listener of operate sink resources, such as create or update Hive table, Kafka topics, ES indices, etc.
  */
-@Service
 @Slf4j
-public class CreateSinkResourceListener implements SinkOperateListener {
+@Service
+public class SinkResourceListener implements SinkOperateListener {
 
     @Autowired
     private StreamSinkEntityMapper sinkMapper;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/SinkResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/SinkResourceOperator.java
index f3bdfd8c1..d8709db02 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/SinkResourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/SinkResourceOperator.java
@@ -36,6 +36,25 @@ public interface SinkResourceOperator {
      * @param groupId The inlong group id.
      * @param sinkInfo The sink response info.
      */
-    void createSinkResource(String groupId, SinkInfo sinkInfo);
+    default void createSinkResource(String groupId, SinkInfo sinkInfo) {
+    }
+
+    /**
+     * Update sink resource.
+     *
+     * @param groupId The inlong group id.
+     * @param sinkInfo The sink response info.
+     */
+    default void updateSinkResource(String groupId, SinkInfo sinkInfo) {
+    }
+
+    /**
+     * Delete sink resource.
+     *
+     * @param groupId The inlong group id.
+     * @param sinkInfo The sink response info.
+     */
+    default void deleteSinkResource(String groupId, SinkInfo sinkInfo) {
+    }
 
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/hive/HiveResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/hive/HiveResourceOperator.java
index 2c488e208..8d2d1a1f0 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/hive/HiveResourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/hive/HiveResourceOperator.java
@@ -118,7 +118,7 @@ public class HiveResourceOperator implements SinkResourceOperator {
             throw new WorkflowException("create hive table failed, reason: " + e.getMessage());
         }
 
-        LOGGER.info("success create hive table for data group [" + groupId + "]");
+        LOGGER.info("success to create hive table for group [" + groupId + "]");
     }
 
     protected HiveTableQueryBean getTableQueryBean(SinkInfo config, HiveSinkDTO hiveInfo) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/hive/HiveSinkEventSelector.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/hive/HiveSinkEventSelector.java
deleted file mode 100644
index 0b6bcaf89..000000000
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/hive/HiveSinkEventSelector.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.resource.hive;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.inlong.manager.common.enums.SinkType;
-import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
-import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
-import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
-import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
-import org.apache.inlong.manager.service.sink.StreamSinkService;
-import org.apache.inlong.manager.workflow.WorkflowContext;
-import org.apache.inlong.manager.workflow.event.EventSelector;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-import java.util.List;
-import java.util.stream.Collectors;
-
-@Component
-@Slf4j
-public class HiveSinkEventSelector implements EventSelector {
-
-    @Autowired
-    private StreamSinkService sinkService;
-    @Autowired
-    private InlongStreamEntityMapper streamMapper;
-
-    @Override
-    public boolean accept(WorkflowContext context) {
-        ProcessForm processForm = context.getProcessForm();
-        if (!(processForm instanceof GroupResourceProcessForm)) {
-            return false;
-        }
-
-        GroupResourceProcessForm form = (GroupResourceProcessForm) processForm;
-        String groupId = form.getInlongGroupId();
-        if (form.getGroupInfo() == null || StringUtils.isEmpty(form.getGroupInfo().getInlongGroupId())) {
-            log.info("not add create hive table listener as the info was null for groupId [{}]", groupId);
-            return false;
-        }
-        List<String> streamWithHiveSink = sinkService.getExistsStreamIdList(groupId, SinkType.SINK_HIVE,
-                streamMapper.selectByGroupId(groupId)
-                        .stream()
-                        .map(InlongStreamEntity::getInlongStreamId)
-                        .collect(Collectors.toList()));
-
-        if (CollectionUtils.isEmpty(streamWithHiveSink)) {
-            log.warn("skip to create hive table as no hive sink found for groupId={} streamId={}",
-                    groupId, form.getInlongStreamId());
-            return false;
-        }
-
-        log.info("add create hive table listener for groupId [{}]", groupId);
-        return true;
-    }
-
-}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/kafka/KafkaResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/kafka/KafkaResourceOperator.java
index 4af1e921d..435e685c2 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/kafka/KafkaResourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/kafka/KafkaResourceOperator.java
@@ -23,6 +23,7 @@ import org.apache.inlong.manager.common.enums.SinkType;
 import org.apache.inlong.manager.common.exceptions.WorkflowException;
 import org.apache.inlong.manager.common.pojo.sink.SinkInfo;
 import org.apache.inlong.manager.common.pojo.sink.kafka.KafkaSinkDTO;
+import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.service.resource.SinkResourceOperator;
 import org.apache.inlong.manager.service.sink.StreamSinkService;
 import org.apache.kafka.clients.admin.Admin;
@@ -39,6 +40,9 @@ import java.util.Collections;
 import java.util.Optional;
 import java.util.Properties;
 
+/**
+ * Kafka resource operator for creating Kafka topic
+ */
 @Service
 @Slf4j
 public class KafkaResourceOperator implements SinkResourceOperator {
@@ -54,17 +58,22 @@ public class KafkaResourceOperator implements SinkResourceOperator {
     @Override
     public void createSinkResource(String groupId, SinkInfo sinkInfo) {
         KafkaSinkDTO kafkaInfo = KafkaSinkDTO.getFromJson(sinkInfo.getExtParams());
-        try (Admin admin = getKafkaAdmin(kafkaInfo)) {
-            if (needCreateTopic(admin, kafkaInfo)) {
+        String topicName = kafkaInfo.getTopicName();
+        String partitionNum = kafkaInfo.getPartitionNum();
+        Preconditions.checkNotEmpty(topicName, "topic name cannot be empty");
+        Preconditions.checkNotEmpty(partitionNum, "partition cannot be empty");
+
+        try (Admin admin = getKafkaAdmin(kafkaInfo.getBootstrapServers())) {
+            boolean topicExists = isTopicExists(admin, topicName, partitionNum);
+            if (!topicExists) {
                 CreateTopicsResult result = admin.createTopics(Collections.singleton(
-                        new NewTopic(kafkaInfo.getTopicName(),
-                                Optional.of(Integer.parseInt(kafkaInfo.getPartitionNum())),
-                                Optional.empty())));
-                result.values().get(kafkaInfo.getTopicName()).get();
+                        new NewTopic(topicName, Optional.of(Integer.parseInt(partitionNum)), Optional.empty())));
+                result.values().get(topicName).get();
             }
+
             sinkService.updateStatus(sinkInfo.getId(), SinkStatus.CONFIG_SUCCESSFUL.getCode(),
                     "create kafka topic success");
-            log.info("success create kafka topic {} for data group [{}]", kafkaInfo.getTopicName(), groupId);
+            log.info("success to create kafka topic {} for group [{}]", topicName, groupId);
         } catch (Throwable e) {
             log.error("create kafka topic error, ", e);
             sinkService.updateStatus(sinkInfo.getId(), SinkStatus.CONFIG_FAILED.getCode(), e.getMessage());
@@ -72,30 +81,36 @@ public class KafkaResourceOperator implements SinkResourceOperator {
         }
     }
 
-    private Admin getKafkaAdmin(KafkaSinkDTO kafkaInfo) {
-        Properties props = new Properties();
-        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaInfo.getBootstrapServers());
-        return Admin.create(props);
-    }
-
-    private boolean needCreateTopic(Admin admin, KafkaSinkDTO kafkaInfo) throws Exception {
+    /**
+     * Check whether the topic exists in the Kafka MQ
+     */
+    private boolean isTopicExists(Admin admin, String topicName, String partitionNum) throws Exception {
         ListTopicsResult listResult = admin.listTopics();
-        if (!listResult.namesToListings().get().containsKey(kafkaInfo.getTopicName())) {
-            log.debug("kafka topic {} not existed, proceed to create", kafkaInfo.getTopicName());
-            return true;
+        if (!listResult.namesToListings().get().containsKey(topicName)) {
+            log.info("kafka topic {} not existed", topicName);
+            return false;
         }
-        DescribeTopicsResult result = admin.describeTopics(Collections.singletonList(kafkaInfo.getTopicName()));
-        TopicDescription desc = result.values().get(kafkaInfo.getTopicName()).get();
-        if (desc.partitions().size() != Integer.parseInt(kafkaInfo.getPartitionNum())) {
-            String errMsg = String.format(
-                    "kafka topic %s already existed with partition num %d <> requested partition num %s",
-                    kafkaInfo.getTopicName(), desc.partitions().size(), kafkaInfo.getPartitionNum());
+
+        DescribeTopicsResult result = admin.describeTopics(Collections.singletonList(topicName));
+        TopicDescription desc = result.values().get(topicName).get();
+        if (desc.partitions().size() != Integer.parseInt(partitionNum)) {
+            String errMsg = String.format("kafka topic %s already exist with partition num=%d, "
+                    + "but the requested partition num=%s", topicName, desc.partitions().size(), partitionNum);
             log.error(errMsg);
             throw new IllegalArgumentException(errMsg);
         } else {
-            log.debug("kafka topic {} with {} partitions already existed, no need to create",
-                    kafkaInfo.getTopicName(), kafkaInfo.getPartitionNum());
-            return false;
+            log.info("kafka topic {} with {} partitions already existed, no need to create", topicName, partitionNum);
+            return true;
         }
     }
+
+    /**
+     * Get Kafka admin from the given bootstrap servers
+     */
+    private Admin getKafkaAdmin(String bootstrapServers) {
+        Properties props = new Properties();
+        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        return Admin.create(props);
+    }
+
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/ServiceTaskListenerFactory.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/ServiceTaskListenerFactory.java
index 9643ac37c..8455a628e 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/ServiceTaskListenerFactory.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/ServiceTaskListenerFactory.java
@@ -18,7 +18,7 @@
 package org.apache.inlong.manager.service.workflow;
 
 import com.google.common.collect.Lists;
-import lombok.Setter;
+import lombok.Data;
 import org.apache.commons.collections.MapUtils;
 import org.apache.inlong.manager.service.mq.CreatePulsarGroupTaskListener;
 import org.apache.inlong.manager.service.mq.CreatePulsarResourceTaskListener;
@@ -26,7 +26,7 @@ import org.apache.inlong.manager.service.mq.CreateTubeGroupTaskListener;
 import org.apache.inlong.manager.service.mq.CreateTubeTopicTaskListener;
 import org.apache.inlong.manager.service.mq.PulsarEventSelector;
 import org.apache.inlong.manager.service.mq.TubeEventSelector;
-import org.apache.inlong.manager.service.resource.CreateSinkResourceListener;
+import org.apache.inlong.manager.service.resource.SinkResourceListener;
 import org.apache.inlong.manager.service.sort.CreateSortConfigListener;
 import org.apache.inlong.manager.service.sort.PushSortConfigListener;
 import org.apache.inlong.manager.service.sort.ZookeeperDisabledSelector;
@@ -45,7 +45,6 @@ import org.apache.inlong.manager.workflow.definition.ServiceTaskType;
 import org.apache.inlong.manager.workflow.event.EventSelector;
 import org.apache.inlong.manager.workflow.event.task.DataSourceOperateListener;
 import org.apache.inlong.manager.workflow.event.task.QueueOperateListener;
-import org.apache.inlong.manager.workflow.event.task.SinkOperateListener;
 import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
 import org.apache.inlong.manager.workflow.event.task.TaskEventListener;
 import org.apache.inlong.manager.workflow.plugin.Plugin;
@@ -56,56 +55,43 @@ import org.springframework.stereotype.Component;
 
 import javax.annotation.PostConstruct;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
+@Data
 @Component
 public class ServiceTaskListenerFactory implements PluginBinder, ServiceTaskListenerProvider {
 
     private Map<DataSourceOperateListener, EventSelector> sourceOperateListeners;
 
-    private Map<SinkOperateListener, EventSelector> sinkOperateListeners;
-
     private Map<QueueOperateListener, EventSelector> queueOperateListeners;
 
     private Map<SortOperateListener, EventSelector> sortOperateListeners;
 
     @Autowired
-    @Setter
     private SourceStopListener sourceStopListener;
-
     @Autowired
-    @Setter
     private SourceRestartListener sourceRestartListener;
-
     @Autowired
-    @Setter
     private SourceDeleteListener sourceDeleteListener;
 
     @Autowired
-    @Setter
     private CreateTubeTopicTaskListener createTubeTopicTaskListener;
     @Autowired
-    @Setter
     private CreateTubeGroupTaskListener createTubeGroupTaskListener;
     @Autowired
-    @Setter
     private CreatePulsarResourceTaskListener createPulsarResourceTaskListener;
     @Autowired
-    @Setter
     private CreatePulsarGroupTaskListener createPulsarGroupTaskListener;
 
     @Autowired
-    @Setter
-    private CreateSinkResourceListener createSinkResourceListener;
+    private SinkResourceListener sinkResourceListener;
 
     @Autowired
-    @Setter
     private PushSortConfigListener pushSortConfigListener;
-    @Autowired
     private ZookeeperEnabledSelector zookeeperEnabledSelector;
-
     @Autowired
     private ZookeeperDisabledSelector zookeeperDisabledSelector;
     @Autowired
@@ -120,8 +106,6 @@ public class ServiceTaskListenerFactory implements PluginBinder, ServiceTaskList
         sourceOperateListeners.put(sourceStopListener, new SourceStopEventSelector());
         sourceOperateListeners.put(sourceDeleteListener, new SourceDeleteEventSelector());
         sourceOperateListeners.put(sourceRestartListener, new SourceRestartEventSelector());
-        sinkOperateListeners = new LinkedHashMap<>();
-        sinkOperateListeners.put(createSinkResourceListener, EventSelector.SELECT_ANY);
         queueOperateListeners = new LinkedHashMap<>();
         queueOperateListeners.put(createTubeTopicTaskListener, new TubeEventSelector());
         queueOperateListeners.put(createTubeGroupTaskListener, new TubeEventSelector());
@@ -135,7 +119,6 @@ public class ServiceTaskListenerFactory implements PluginBinder, ServiceTaskList
 
     public void clearListeners() {
         sourceOperateListeners = new LinkedHashMap<>();
-        sinkOperateListeners = new LinkedHashMap<>();
         queueOperateListeners = new LinkedHashMap<>();
         sortOperateListeners = new LinkedHashMap<>();
     }
@@ -159,8 +142,7 @@ public class ServiceTaskListenerFactory implements PluginBinder, ServiceTaskList
                 List<DataSourceOperateListener> sourceOperateListeners = getSourceOperateListener(workflowContext);
                 return Lists.newArrayList(sourceOperateListeners);
             case INIT_SINK:
-                List<SinkOperateListener> sinkOperateListeners = getSinkOperateListener(workflowContext);
-                return Lists.newArrayList(sinkOperateListeners);
+                return Collections.singletonList(sinkResourceListener);
             default:
                 throw new IllegalArgumentException(String.format("UnSupport ServiceTaskType %s", serviceTaskType));
         }
@@ -177,17 +159,6 @@ public class ServiceTaskListenerFactory implements PluginBinder, ServiceTaskList
         return listeners;
     }
 
-    public List<SinkOperateListener> getSinkOperateListener(WorkflowContext context) {
-        List<SinkOperateListener> listeners = new ArrayList<>();
-        for (Map.Entry<SinkOperateListener, EventSelector> entry : sinkOperateListeners.entrySet()) {
-            EventSelector selector = entry.getValue();
-            if (selector != null && selector.accept(context)) {
-                listeners.add(entry.getKey());
-            }
-        }
-        return listeners;
-    }
-
     public List<QueueOperateListener> getQueueOperateListener(WorkflowContext context) {
         List<QueueOperateListener> listeners = new ArrayList<>();
         for (Map.Entry<QueueOperateListener, EventSelector> entry : queueOperateListeners.entrySet()) {
@@ -221,11 +192,6 @@ public class ServiceTaskListenerFactory implements PluginBinder, ServiceTaskList
         if (MapUtils.isNotEmpty(pluginDsOperateListeners)) {
             sourceOperateListeners.putAll(processPlugin.createSourceOperateListeners());
         }
-        Map<SinkOperateListener, EventSelector> pluginSinkOperateListeners =
-                processPlugin.createSinkOperateListeners();
-        if (MapUtils.isNotEmpty(pluginSinkOperateListeners)) {
-            sinkOperateListeners.putAll(pluginSinkOperateListeners);
-        }
         Map<QueueOperateListener, EventSelector> pluginQueueOperateListeners =
                 processPlugin.createQueueOperateListeners();
         if (MapUtils.isNotEmpty(pluginQueueOperateListeners)) {
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/resource/hive/HiveSinkEventSelectorTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/resource/hive/HiveSinkEventSelectorTest.java
deleted file mode 100644
index e5cadbbf1..000000000
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/resource/hive/HiveSinkEventSelectorTest.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.resource.hive;
-
-import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
-import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
-import org.apache.inlong.manager.service.ServiceBaseTest;
-import org.apache.inlong.manager.service.core.impl.InlongGroupServiceTest;
-import org.apache.inlong.manager.workflow.WorkflowContext;
-import org.junit.Assert;
-import org.springframework.beans.factory.annotation.Autowired;
-
-public class HiveSinkEventSelectorTest extends ServiceBaseTest {
-
-    @Autowired
-    HiveSinkEventSelector hiveSinkEventSelector;
-
-    @Autowired
-    private InlongGroupServiceTest groupServiceTest;
-
-    // There will be concurrency problems in the overall operation,This method temporarily fails the test
-    // @Test
-    public void testAccept() {
-        WorkflowContext workflowContext = new WorkflowContext();
-        GroupResourceProcessForm processForm = new GroupResourceProcessForm();
-        String groupName = "hiveGroup";
-        String operator = "admin";
-        String groupId = groupServiceTest.saveGroup(groupName, operator);
-        InlongGroupInfo inlongGroupInfo = groupServiceTest.groupService.get(groupId);
-        processForm.setGroupInfo(inlongGroupInfo);
-        workflowContext.setProcessForm(processForm);
-        Assert.assertFalse(hiveSinkEventSelector.accept(workflowContext));
-        processForm.setGroupInfo(new InlongGroupInfo());
-        Assert.assertFalse(hiveSinkEventSelector.accept(workflowContext));
-        processForm.getGroupInfo().setInlongGroupId("test");
-        Assert.assertTrue(hiveSinkEventSelector.accept(workflowContext));
-    }
-
-}
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImplTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImplTest.java
index 1aff4e046..81e392c63 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImplTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImplTest.java
@@ -48,7 +48,7 @@ import org.apache.inlong.manager.service.mq.CreatePulsarGroupTaskListener;
 import org.apache.inlong.manager.service.mq.CreatePulsarResourceTaskListener;
 import org.apache.inlong.manager.service.mq.CreateTubeGroupTaskListener;
 import org.apache.inlong.manager.service.mq.CreateTubeTopicTaskListener;
-import org.apache.inlong.manager.service.resource.CreateSinkResourceListener;
+import org.apache.inlong.manager.service.resource.SinkResourceListener;
 import org.apache.inlong.manager.service.sort.PushSortConfigListener;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.core.WorkflowEngine;
@@ -203,7 +203,7 @@ public class WorkflowServiceImplTest extends ServiceBaseTest {
     public void mockTaskListenerFactory() {
         CreateTubeGroupTaskListener createTubeGroupTaskListener = mock(CreateTubeGroupTaskListener.class);
         when(createTubeGroupTaskListener.listen(any(WorkflowContext.class))).thenReturn(ListenerResult.success());
-        when(createTubeGroupTaskListener.name()).thenReturn(CreateSinkResourceListener.class.getSimpleName());
+        when(createTubeGroupTaskListener.name()).thenReturn(SinkResourceListener.class.getSimpleName());
         when(createTubeGroupTaskListener.event()).thenReturn(TaskEvent.COMPLETE);
         taskListenerFactory.setCreateTubeGroupTaskListener(createTubeGroupTaskListener);
 
@@ -227,11 +227,11 @@ public class WorkflowServiceImplTest extends ServiceBaseTest {
         when(createPulsarGroupTaskListener.event()).thenReturn(TaskEvent.COMPLETE);
         taskListenerFactory.setCreatePulsarGroupTaskListener(createPulsarGroupTaskListener);
 
-        CreateSinkResourceListener createSinkResourceListener = mock(CreateSinkResourceListener.class);
-        when(createSinkResourceListener.listen(any(WorkflowContext.class))).thenReturn(ListenerResult.success());
-        when(createSinkResourceListener.name()).thenReturn(CreateSinkResourceListener.class.getSimpleName());
-        when(createSinkResourceListener.event()).thenReturn(TaskEvent.COMPLETE);
-        taskListenerFactory.setCreateSinkResourceListener(createSinkResourceListener);
+        SinkResourceListener sinkResourceListener = mock(SinkResourceListener.class);
+        when(sinkResourceListener.listen(any(WorkflowContext.class))).thenReturn(ListenerResult.success());
+        when(sinkResourceListener.name()).thenReturn(SinkResourceListener.class.getSimpleName());
+        when(sinkResourceListener.event()).thenReturn(TaskEvent.COMPLETE);
+        taskListenerFactory.setSinkResourceListener(sinkResourceListener);
 
         PushSortConfigListener pushSortConfigListener = mock(PushSortConfigListener.class);
         when(pushSortConfigListener.listen(any(WorkflowContext.class))).thenReturn(ListenerResult.success());
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/ServiceTask.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/ServiceTask.java
index 7ac71c2a7..b110b875b 100644
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/ServiceTask.java
+++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/ServiceTask.java
@@ -19,7 +19,6 @@ package org.apache.inlong.manager.workflow.definition;
 
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
-import java.util.concurrent.atomic.AtomicBoolean;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.common.exceptions.WorkflowException;
@@ -32,9 +31,10 @@ import org.springframework.util.CollectionUtils;
 import java.util.Collections;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
- * System task
+ * Service task workflow
  */
 @Slf4j
 public class ServiceTask extends WorkflowTask {
@@ -42,12 +42,10 @@ public class ServiceTask extends WorkflowTask {
     private static final Set<WorkflowAction> SUPPORTED_ACTIONS = ImmutableSet
             .of(WorkflowAction.COMPLETE, WorkflowAction.CANCEL, WorkflowAction.TERMINATE);
 
-    private ServiceTaskListenerProvider listenerProvider;
-
+    private final AtomicBoolean isInit = new AtomicBoolean(false);
+    private ServiceTaskListenerProvider<TaskEventListener> listenerProvider;
     private ServiceTaskType serviceTaskType;
 
-    private AtomicBoolean isInit = new AtomicBoolean(false);
-
     @Override
     public WorkflowAction defaultNextAction() {
         return WorkflowAction.COMPLETE;
@@ -95,14 +93,12 @@ public class ServiceTask extends WorkflowTask {
         return serviceTask;
     }
 
-    public WorkflowTask addListenerProvider(ServiceTaskListenerProvider provider) {
+    public void addListenerProvider(ServiceTaskListenerProvider<TaskEventListener> provider) {
         this.listenerProvider = provider;
-        return this;
     }
 
-    public WorkflowTask addServiceTaskType(ServiceTaskType type) {
+    public void addServiceTaskType(ServiceTaskType type) {
         this.serviceTaskType = type;
-        return this;
     }
 
     public void initListeners(WorkflowContext workflowContext) {