You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/05/07 13:00:29 UTC
[incubator-inlong] branch master updated: [INLONG-4105][Manager] Refactor the sink workflow and sink resource operator (#4106)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new a9a236ffd [INLONG-4105][Manager] Refactor the sink workflow and sink resource operator (#4106)
a9a236ffd is described below
commit a9a236ffd83ae2aaaf71f22f0febbea5bf05d271
Author: healchow <he...@gmail.com>
AuthorDate: Sat May 7 21:00:24 2022 +0800
[INLONG-4105][Manager] Refactor the sink workflow and sink resource operator (#4106)
---
...urceListener.java => SinkResourceListener.java} | 6 +-
.../service/resource/SinkResourceOperator.java | 21 +++++-
.../resource/hive/HiveResourceOperator.java | 2 +-
.../resource/hive/HiveSinkEventSelector.java | 75 ----------------------
.../resource/kafka/KafkaResourceOperator.java | 67 +++++++++++--------
.../workflow/ServiceTaskListenerFactory.java | 46 ++-----------
.../resource/hive/HiveSinkEventSelectorTest.java | 54 ----------------
.../service/workflow/WorkflowServiceImplTest.java | 14 ++--
.../manager/workflow/definition/ServiceTask.java | 16 ++---
9 files changed, 84 insertions(+), 217 deletions(-)
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/CreateSinkResourceListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/SinkResourceListener.java
similarity index 95%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/CreateSinkResourceListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/SinkResourceListener.java
index 5f562f32c..f1d9feee9 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/CreateSinkResourceListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/SinkResourceListener.java
@@ -37,11 +37,11 @@ import java.util.List;
import java.util.stream.Collectors;
/**
- * Event listener of create sink resources, such as Hive table, Kafka topics, ES indices, etc.
+ * Event listener of operate sink resources, such as create or update Hive table, Kafka topics, ES indices, etc.
*/
-@Service
@Slf4j
-public class CreateSinkResourceListener implements SinkOperateListener {
+@Service
+public class SinkResourceListener implements SinkOperateListener {
@Autowired
private StreamSinkEntityMapper sinkMapper;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/SinkResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/SinkResourceOperator.java
index f3bdfd8c1..d8709db02 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/SinkResourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/SinkResourceOperator.java
@@ -36,6 +36,25 @@ public interface SinkResourceOperator {
* @param groupId The inlong group id.
* @param sinkInfo The sink response info.
*/
- void createSinkResource(String groupId, SinkInfo sinkInfo);
+ default void createSinkResource(String groupId, SinkInfo sinkInfo) {
+ }
+
+ /**
+ * Update sink resource.
+ *
+ * @param groupId The inlong group id.
+ * @param sinkInfo The sink response info.
+ */
+ default void updateSinkResource(String groupId, SinkInfo sinkInfo) {
+ }
+
+ /**
+ * Delete sink resource.
+ *
+ * @param groupId The inlong group id.
+ * @param sinkInfo The sink response info.
+ */
+ default void deleteSinkResource(String groupId, SinkInfo sinkInfo) {
+ }
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/hive/HiveResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/hive/HiveResourceOperator.java
index 2c488e208..8d2d1a1f0 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/hive/HiveResourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/hive/HiveResourceOperator.java
@@ -118,7 +118,7 @@ public class HiveResourceOperator implements SinkResourceOperator {
throw new WorkflowException("create hive table failed, reason: " + e.getMessage());
}
- LOGGER.info("success create hive table for data group [" + groupId + "]");
+ LOGGER.info("success to create hive table for group [" + groupId + "]");
}
protected HiveTableQueryBean getTableQueryBean(SinkInfo config, HiveSinkDTO hiveInfo) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/hive/HiveSinkEventSelector.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/hive/HiveSinkEventSelector.java
deleted file mode 100644
index 0b6bcaf89..000000000
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/hive/HiveSinkEventSelector.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.resource.hive;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.inlong.manager.common.enums.SinkType;
-import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
-import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
-import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
-import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
-import org.apache.inlong.manager.service.sink.StreamSinkService;
-import org.apache.inlong.manager.workflow.WorkflowContext;
-import org.apache.inlong.manager.workflow.event.EventSelector;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-import java.util.List;
-import java.util.stream.Collectors;
-
-@Component
-@Slf4j
-public class HiveSinkEventSelector implements EventSelector {
-
- @Autowired
- private StreamSinkService sinkService;
- @Autowired
- private InlongStreamEntityMapper streamMapper;
-
- @Override
- public boolean accept(WorkflowContext context) {
- ProcessForm processForm = context.getProcessForm();
- if (!(processForm instanceof GroupResourceProcessForm)) {
- return false;
- }
-
- GroupResourceProcessForm form = (GroupResourceProcessForm) processForm;
- String groupId = form.getInlongGroupId();
- if (form.getGroupInfo() == null || StringUtils.isEmpty(form.getGroupInfo().getInlongGroupId())) {
- log.info("not add create hive table listener as the info was null for groupId [{}]", groupId);
- return false;
- }
- List<String> streamWithHiveSink = sinkService.getExistsStreamIdList(groupId, SinkType.SINK_HIVE,
- streamMapper.selectByGroupId(groupId)
- .stream()
- .map(InlongStreamEntity::getInlongStreamId)
- .collect(Collectors.toList()));
-
- if (CollectionUtils.isEmpty(streamWithHiveSink)) {
- log.warn("skip to create hive table as no hive sink found for groupId={} streamId={}",
- groupId, form.getInlongStreamId());
- return false;
- }
-
- log.info("add create hive table listener for groupId [{}]", groupId);
- return true;
- }
-
-}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/kafka/KafkaResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/kafka/KafkaResourceOperator.java
index 4af1e921d..435e685c2 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/kafka/KafkaResourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/kafka/KafkaResourceOperator.java
@@ -23,6 +23,7 @@ import org.apache.inlong.manager.common.enums.SinkType;
import org.apache.inlong.manager.common.exceptions.WorkflowException;
import org.apache.inlong.manager.common.pojo.sink.SinkInfo;
import org.apache.inlong.manager.common.pojo.sink.kafka.KafkaSinkDTO;
+import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.service.resource.SinkResourceOperator;
import org.apache.inlong.manager.service.sink.StreamSinkService;
import org.apache.kafka.clients.admin.Admin;
@@ -39,6 +40,9 @@ import java.util.Collections;
import java.util.Optional;
import java.util.Properties;
+/**
+ * Kafka resource operator for creating Kafka topic
+ */
@Service
@Slf4j
public class KafkaResourceOperator implements SinkResourceOperator {
@@ -54,17 +58,22 @@ public class KafkaResourceOperator implements SinkResourceOperator {
@Override
public void createSinkResource(String groupId, SinkInfo sinkInfo) {
KafkaSinkDTO kafkaInfo = KafkaSinkDTO.getFromJson(sinkInfo.getExtParams());
- try (Admin admin = getKafkaAdmin(kafkaInfo)) {
- if (needCreateTopic(admin, kafkaInfo)) {
+ String topicName = kafkaInfo.getTopicName();
+ String partitionNum = kafkaInfo.getPartitionNum();
+ Preconditions.checkNotEmpty(topicName, "topic name cannot be empty");
+ Preconditions.checkNotEmpty(partitionNum, "partition cannot be empty");
+
+ try (Admin admin = getKafkaAdmin(kafkaInfo.getBootstrapServers())) {
+ boolean topicExists = isTopicExists(admin, topicName, partitionNum);
+ if (!topicExists) {
CreateTopicsResult result = admin.createTopics(Collections.singleton(
- new NewTopic(kafkaInfo.getTopicName(),
- Optional.of(Integer.parseInt(kafkaInfo.getPartitionNum())),
- Optional.empty())));
- result.values().get(kafkaInfo.getTopicName()).get();
+ new NewTopic(topicName, Optional.of(Integer.parseInt(partitionNum)), Optional.empty())));
+ result.values().get(topicName).get();
}
+
sinkService.updateStatus(sinkInfo.getId(), SinkStatus.CONFIG_SUCCESSFUL.getCode(),
"create kafka topic success");
- log.info("success create kafka topic {} for data group [{}]", kafkaInfo.getTopicName(), groupId);
+ log.info("success to create kafka topic {} for group [{}]", topicName, groupId);
} catch (Throwable e) {
log.error("create kafka topic error, ", e);
sinkService.updateStatus(sinkInfo.getId(), SinkStatus.CONFIG_FAILED.getCode(), e.getMessage());
@@ -72,30 +81,36 @@ public class KafkaResourceOperator implements SinkResourceOperator {
}
}
- private Admin getKafkaAdmin(KafkaSinkDTO kafkaInfo) {
- Properties props = new Properties();
- props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaInfo.getBootstrapServers());
- return Admin.create(props);
- }
-
- private boolean needCreateTopic(Admin admin, KafkaSinkDTO kafkaInfo) throws Exception {
+ /**
+ * Check whether the topic exists in the Kafka MQ
+ */
+ private boolean isTopicExists(Admin admin, String topicName, String partitionNum) throws Exception {
ListTopicsResult listResult = admin.listTopics();
- if (!listResult.namesToListings().get().containsKey(kafkaInfo.getTopicName())) {
- log.debug("kafka topic {} not existed, proceed to create", kafkaInfo.getTopicName());
- return true;
+ if (!listResult.namesToListings().get().containsKey(topicName)) {
+ log.info("kafka topic {} not existed", topicName);
+ return false;
}
- DescribeTopicsResult result = admin.describeTopics(Collections.singletonList(kafkaInfo.getTopicName()));
- TopicDescription desc = result.values().get(kafkaInfo.getTopicName()).get();
- if (desc.partitions().size() != Integer.parseInt(kafkaInfo.getPartitionNum())) {
- String errMsg = String.format(
- "kafka topic %s already existed with partition num %d <> requested partition num %s",
- kafkaInfo.getTopicName(), desc.partitions().size(), kafkaInfo.getPartitionNum());
+
+ DescribeTopicsResult result = admin.describeTopics(Collections.singletonList(topicName));
+ TopicDescription desc = result.values().get(topicName).get();
+ if (desc.partitions().size() != Integer.parseInt(partitionNum)) {
+ String errMsg = String.format("kafka topic %s already exist with partition num=%d, "
+ + "but the requested partition num=%s", topicName, desc.partitions().size(), partitionNum);
log.error(errMsg);
throw new IllegalArgumentException(errMsg);
} else {
- log.debug("kafka topic {} with {} partitions already existed, no need to create",
- kafkaInfo.getTopicName(), kafkaInfo.getPartitionNum());
- return false;
+ log.info("kafka topic {} with {} partitions already existed, no need to create", topicName, partitionNum);
+ return true;
}
}
+
+ /**
+ * Get Kafka admin from the given bootstrap servers
+ */
+ private Admin getKafkaAdmin(String bootstrapServers) {
+ Properties props = new Properties();
+ props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ return Admin.create(props);
+ }
+
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/ServiceTaskListenerFactory.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/ServiceTaskListenerFactory.java
index 9643ac37c..8455a628e 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/ServiceTaskListenerFactory.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/ServiceTaskListenerFactory.java
@@ -18,7 +18,7 @@
package org.apache.inlong.manager.service.workflow;
import com.google.common.collect.Lists;
-import lombok.Setter;
+import lombok.Data;
import org.apache.commons.collections.MapUtils;
import org.apache.inlong.manager.service.mq.CreatePulsarGroupTaskListener;
import org.apache.inlong.manager.service.mq.CreatePulsarResourceTaskListener;
@@ -26,7 +26,7 @@ import org.apache.inlong.manager.service.mq.CreateTubeGroupTaskListener;
import org.apache.inlong.manager.service.mq.CreateTubeTopicTaskListener;
import org.apache.inlong.manager.service.mq.PulsarEventSelector;
import org.apache.inlong.manager.service.mq.TubeEventSelector;
-import org.apache.inlong.manager.service.resource.CreateSinkResourceListener;
+import org.apache.inlong.manager.service.resource.SinkResourceListener;
import org.apache.inlong.manager.service.sort.CreateSortConfigListener;
import org.apache.inlong.manager.service.sort.PushSortConfigListener;
import org.apache.inlong.manager.service.sort.ZookeeperDisabledSelector;
@@ -45,7 +45,6 @@ import org.apache.inlong.manager.workflow.definition.ServiceTaskType;
import org.apache.inlong.manager.workflow.event.EventSelector;
import org.apache.inlong.manager.workflow.event.task.DataSourceOperateListener;
import org.apache.inlong.manager.workflow.event.task.QueueOperateListener;
-import org.apache.inlong.manager.workflow.event.task.SinkOperateListener;
import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
import org.apache.inlong.manager.workflow.event.task.TaskEventListener;
import org.apache.inlong.manager.workflow.plugin.Plugin;
@@ -56,56 +55,43 @@ import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+@Data
@Component
public class ServiceTaskListenerFactory implements PluginBinder, ServiceTaskListenerProvider {
private Map<DataSourceOperateListener, EventSelector> sourceOperateListeners;
- private Map<SinkOperateListener, EventSelector> sinkOperateListeners;
-
private Map<QueueOperateListener, EventSelector> queueOperateListeners;
private Map<SortOperateListener, EventSelector> sortOperateListeners;
@Autowired
- @Setter
private SourceStopListener sourceStopListener;
-
@Autowired
- @Setter
private SourceRestartListener sourceRestartListener;
-
@Autowired
- @Setter
private SourceDeleteListener sourceDeleteListener;
@Autowired
- @Setter
private CreateTubeTopicTaskListener createTubeTopicTaskListener;
@Autowired
- @Setter
private CreateTubeGroupTaskListener createTubeGroupTaskListener;
@Autowired
- @Setter
private CreatePulsarResourceTaskListener createPulsarResourceTaskListener;
@Autowired
- @Setter
private CreatePulsarGroupTaskListener createPulsarGroupTaskListener;
@Autowired
- @Setter
- private CreateSinkResourceListener createSinkResourceListener;
+ private SinkResourceListener sinkResourceListener;
@Autowired
- @Setter
private PushSortConfigListener pushSortConfigListener;
- @Autowired
private ZookeeperEnabledSelector zookeeperEnabledSelector;
-
@Autowired
private ZookeeperDisabledSelector zookeeperDisabledSelector;
@Autowired
@@ -120,8 +106,6 @@ public class ServiceTaskListenerFactory implements PluginBinder, ServiceTaskList
sourceOperateListeners.put(sourceStopListener, new SourceStopEventSelector());
sourceOperateListeners.put(sourceDeleteListener, new SourceDeleteEventSelector());
sourceOperateListeners.put(sourceRestartListener, new SourceRestartEventSelector());
- sinkOperateListeners = new LinkedHashMap<>();
- sinkOperateListeners.put(createSinkResourceListener, EventSelector.SELECT_ANY);
queueOperateListeners = new LinkedHashMap<>();
queueOperateListeners.put(createTubeTopicTaskListener, new TubeEventSelector());
queueOperateListeners.put(createTubeGroupTaskListener, new TubeEventSelector());
@@ -135,7 +119,6 @@ public class ServiceTaskListenerFactory implements PluginBinder, ServiceTaskList
public void clearListeners() {
sourceOperateListeners = new LinkedHashMap<>();
- sinkOperateListeners = new LinkedHashMap<>();
queueOperateListeners = new LinkedHashMap<>();
sortOperateListeners = new LinkedHashMap<>();
}
@@ -159,8 +142,7 @@ public class ServiceTaskListenerFactory implements PluginBinder, ServiceTaskList
List<DataSourceOperateListener> sourceOperateListeners = getSourceOperateListener(workflowContext);
return Lists.newArrayList(sourceOperateListeners);
case INIT_SINK:
- List<SinkOperateListener> sinkOperateListeners = getSinkOperateListener(workflowContext);
- return Lists.newArrayList(sinkOperateListeners);
+ return Collections.singletonList(sinkResourceListener);
default:
throw new IllegalArgumentException(String.format("UnSupport ServiceTaskType %s", serviceTaskType));
}
@@ -177,17 +159,6 @@ public class ServiceTaskListenerFactory implements PluginBinder, ServiceTaskList
return listeners;
}
- public List<SinkOperateListener> getSinkOperateListener(WorkflowContext context) {
- List<SinkOperateListener> listeners = new ArrayList<>();
- for (Map.Entry<SinkOperateListener, EventSelector> entry : sinkOperateListeners.entrySet()) {
- EventSelector selector = entry.getValue();
- if (selector != null && selector.accept(context)) {
- listeners.add(entry.getKey());
- }
- }
- return listeners;
- }
-
public List<QueueOperateListener> getQueueOperateListener(WorkflowContext context) {
List<QueueOperateListener> listeners = new ArrayList<>();
for (Map.Entry<QueueOperateListener, EventSelector> entry : queueOperateListeners.entrySet()) {
@@ -221,11 +192,6 @@ public class ServiceTaskListenerFactory implements PluginBinder, ServiceTaskList
if (MapUtils.isNotEmpty(pluginDsOperateListeners)) {
sourceOperateListeners.putAll(processPlugin.createSourceOperateListeners());
}
- Map<SinkOperateListener, EventSelector> pluginSinkOperateListeners =
- processPlugin.createSinkOperateListeners();
- if (MapUtils.isNotEmpty(pluginSinkOperateListeners)) {
- sinkOperateListeners.putAll(pluginSinkOperateListeners);
- }
Map<QueueOperateListener, EventSelector> pluginQueueOperateListeners =
processPlugin.createQueueOperateListeners();
if (MapUtils.isNotEmpty(pluginQueueOperateListeners)) {
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/resource/hive/HiveSinkEventSelectorTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/resource/hive/HiveSinkEventSelectorTest.java
deleted file mode 100644
index e5cadbbf1..000000000
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/resource/hive/HiveSinkEventSelectorTest.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.resource.hive;
-
-import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
-import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
-import org.apache.inlong.manager.service.ServiceBaseTest;
-import org.apache.inlong.manager.service.core.impl.InlongGroupServiceTest;
-import org.apache.inlong.manager.workflow.WorkflowContext;
-import org.junit.Assert;
-import org.springframework.beans.factory.annotation.Autowired;
-
-public class HiveSinkEventSelectorTest extends ServiceBaseTest {
-
- @Autowired
- HiveSinkEventSelector hiveSinkEventSelector;
-
- @Autowired
- private InlongGroupServiceTest groupServiceTest;
-
- // There will be concurrency problems in the overall operation,This method temporarily fails the test
- // @Test
- public void testAccept() {
- WorkflowContext workflowContext = new WorkflowContext();
- GroupResourceProcessForm processForm = new GroupResourceProcessForm();
- String groupName = "hiveGroup";
- String operator = "admin";
- String groupId = groupServiceTest.saveGroup(groupName, operator);
- InlongGroupInfo inlongGroupInfo = groupServiceTest.groupService.get(groupId);
- processForm.setGroupInfo(inlongGroupInfo);
- workflowContext.setProcessForm(processForm);
- Assert.assertFalse(hiveSinkEventSelector.accept(workflowContext));
- processForm.setGroupInfo(new InlongGroupInfo());
- Assert.assertFalse(hiveSinkEventSelector.accept(workflowContext));
- processForm.getGroupInfo().setInlongGroupId("test");
- Assert.assertTrue(hiveSinkEventSelector.accept(workflowContext));
- }
-
-}
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImplTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImplTest.java
index 1aff4e046..81e392c63 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImplTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImplTest.java
@@ -48,7 +48,7 @@ import org.apache.inlong.manager.service.mq.CreatePulsarGroupTaskListener;
import org.apache.inlong.manager.service.mq.CreatePulsarResourceTaskListener;
import org.apache.inlong.manager.service.mq.CreateTubeGroupTaskListener;
import org.apache.inlong.manager.service.mq.CreateTubeTopicTaskListener;
-import org.apache.inlong.manager.service.resource.CreateSinkResourceListener;
+import org.apache.inlong.manager.service.resource.SinkResourceListener;
import org.apache.inlong.manager.service.sort.PushSortConfigListener;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.core.WorkflowEngine;
@@ -203,7 +203,7 @@ public class WorkflowServiceImplTest extends ServiceBaseTest {
public void mockTaskListenerFactory() {
CreateTubeGroupTaskListener createTubeGroupTaskListener = mock(CreateTubeGroupTaskListener.class);
when(createTubeGroupTaskListener.listen(any(WorkflowContext.class))).thenReturn(ListenerResult.success());
- when(createTubeGroupTaskListener.name()).thenReturn(CreateSinkResourceListener.class.getSimpleName());
+ when(createTubeGroupTaskListener.name()).thenReturn(SinkResourceListener.class.getSimpleName());
when(createTubeGroupTaskListener.event()).thenReturn(TaskEvent.COMPLETE);
taskListenerFactory.setCreateTubeGroupTaskListener(createTubeGroupTaskListener);
@@ -227,11 +227,11 @@ public class WorkflowServiceImplTest extends ServiceBaseTest {
when(createPulsarGroupTaskListener.event()).thenReturn(TaskEvent.COMPLETE);
taskListenerFactory.setCreatePulsarGroupTaskListener(createPulsarGroupTaskListener);
- CreateSinkResourceListener createSinkResourceListener = mock(CreateSinkResourceListener.class);
- when(createSinkResourceListener.listen(any(WorkflowContext.class))).thenReturn(ListenerResult.success());
- when(createSinkResourceListener.name()).thenReturn(CreateSinkResourceListener.class.getSimpleName());
- when(createSinkResourceListener.event()).thenReturn(TaskEvent.COMPLETE);
- taskListenerFactory.setCreateSinkResourceListener(createSinkResourceListener);
+ SinkResourceListener sinkResourceListener = mock(SinkResourceListener.class);
+ when(sinkResourceListener.listen(any(WorkflowContext.class))).thenReturn(ListenerResult.success());
+ when(sinkResourceListener.name()).thenReturn(SinkResourceListener.class.getSimpleName());
+ when(sinkResourceListener.event()).thenReturn(TaskEvent.COMPLETE);
+ taskListenerFactory.setSinkResourceListener(sinkResourceListener);
PushSortConfigListener pushSortConfigListener = mock(PushSortConfigListener.class);
when(pushSortConfigListener.listen(any(WorkflowContext.class))).thenReturn(ListenerResult.success());
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/ServiceTask.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/ServiceTask.java
index 7ac71c2a7..b110b875b 100644
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/ServiceTask.java
+++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/ServiceTask.java
@@ -19,7 +19,6 @@ package org.apache.inlong.manager.workflow.definition;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
-import java.util.concurrent.atomic.AtomicBoolean;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.manager.common.exceptions.WorkflowException;
@@ -32,9 +31,10 @@ import org.springframework.util.CollectionUtils;
import java.util.Collections;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
/**
- * System task
+ * Service task workflow
*/
@Slf4j
public class ServiceTask extends WorkflowTask {
@@ -42,12 +42,10 @@ public class ServiceTask extends WorkflowTask {
private static final Set<WorkflowAction> SUPPORTED_ACTIONS = ImmutableSet
.of(WorkflowAction.COMPLETE, WorkflowAction.CANCEL, WorkflowAction.TERMINATE);
- private ServiceTaskListenerProvider listenerProvider;
-
+ private final AtomicBoolean isInit = new AtomicBoolean(false);
+ private ServiceTaskListenerProvider<TaskEventListener> listenerProvider;
private ServiceTaskType serviceTaskType;
- private AtomicBoolean isInit = new AtomicBoolean(false);
-
@Override
public WorkflowAction defaultNextAction() {
return WorkflowAction.COMPLETE;
@@ -95,14 +93,12 @@ public class ServiceTask extends WorkflowTask {
return serviceTask;
}
- public WorkflowTask addListenerProvider(ServiceTaskListenerProvider provider) {
+ public void addListenerProvider(ServiceTaskListenerProvider<TaskEventListener> provider) {
this.listenerProvider = provider;
- return this;
}
- public WorkflowTask addServiceTaskType(ServiceTaskType type) {
+ public void addServiceTaskType(ServiceTaskType type) {
this.serviceTaskType = type;
- return this;
}
public void initListeners(WorkflowContext workflowContext) {