You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/07/27 10:48:01 UTC
[inlong] branch master updated: [INLONG-5212][Manager] Merge the Selector classes into their related Listeners (#5220)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new cec55445a [INLONG-5212][Manager] Merge the Selector classes into their related Listeners (#5220)
cec55445a is described below
commit cec55445a1c97027a550706c36e68ff6c636ffd4
Author: healchow <he...@gmail.com>
AuthorDate: Wed Jul 27 18:47:57 2022 +0800
[INLONG-5212][Manager] Merge the Selector classes into their related Listeners (#5220)
---
.../manager/plugin/FlinkSortProcessPlugin.java | 39 ++---
.../plugin/eventselect/DeleteProcessSelector.java | 55 -------
.../plugin/eventselect/DeleteStreamSelector.java | 57 --------
.../plugin/eventselect/RestartProcessSelector.java | 55 -------
.../plugin/eventselect/RestartStreamSelector.java | 57 --------
.../plugin/eventselect/StartupProcessSelector.java | 54 -------
.../plugin/eventselect/StartupStreamSelector.java | 55 -------
.../plugin/eventselect/SuspendProcessSelector.java | 55 -------
.../plugin/eventselect/SuspendStreamSelector.java | 55 -------
.../plugin/listener/DeleteSortListener.java | 22 ++-
.../plugin/listener/DeleteStreamListener.java | 26 +++-
.../plugin/listener/RestartSortListener.java | 20 +++
.../plugin/listener/RestartStreamListener.java | 22 +++
.../plugin/listener/StartupSortListener.java | 19 +++
.../plugin/listener/StartupStreamListener.java | 24 +++-
.../plugin/listener/SuspendSortListener.java | 22 ++-
.../plugin/listener/SuspendStreamListener.java | 22 +++
.../plugin/listener/StartupSortListenerTest.java | 8 +-
.../plugin/listener/SuspendSortListenerTest.java | 6 +-
.../service/mq/CreatePulsarGroupTaskListener.java | 36 ++++-
.../mq/CreatePulsarResourceTaskListener.java | 35 ++++-
.../mq/CreatePulsarSubscriptionTaskListener.java | 37 +++++
.../service/mq/CreatePulsarTopicTaskListener.java | 35 +++++
.../service/mq/CreateTubeGroupTaskListener.java | 20 +++
.../service/mq/CreateTubeTopicTaskListener.java | 20 +++
.../mq/DeletePulsarResourceTaskListener.java | 34 +++++
.../service/mq/DeletePulsarTopicTaskListener.java | 35 +++++
.../service/mq/PulsarResourceCreateSelector.java | 67 ---------
.../service/mq/PulsarResourceDeleteSelector.java | 66 ---------
.../service/mq/PulsarTopicCreateSelector.java | 63 --------
.../service/mq/PulsarTopicDeleteSelector.java | 62 --------
.../manager/service/mq/TubeEventSelector.java | 50 -------
.../resource/StreamSinkResourceListener.java | 7 +
.../manager/service/sort/SortConfigListener.java | 31 ++++
.../service/sort/StreamSortConfigListener.java | 21 +++
.../service/sort/ZookeeperDisabledSelector.java | 63 --------
.../service/sort/ZookeeperEnabledSelector.java | 53 -------
.../listener/AbstractSourceOperateListener.java | 6 +-
.../source/listener/SourceDeleteEventSelector.java | 40 ------
.../source/listener/SourceDeleteListener.java | 14 ++
.../listener/SourceRestartEventSelector.java | 40 ------
.../source/listener/SourceRestartListener.java | 14 ++
.../source/listener/SourceStopEventSelector.java | 40 ------
.../source/listener/SourceStopListener.java | 14 ++
.../listener/ConsumptionPassTaskListener.java | 2 +-
.../group/CreateGroupWorkflowDefinition.java | 18 +--
.../group/DeleteGroupWorkflowDefinition.java | 12 +-
.../group/RestartGroupWorkflowDefinition.java | 8 +-
.../group/SuspendGroupWorkflowDefinition.java | 8 +-
.../group/listener/InitGroupCompleteListener.java | 10 +-
.../listener/GroupTaskListenerFactory.java | 144 +++++++++----------
.../listener/StreamTaskListenerFactory.java | 159 ++++++++++-----------
.../stream/CreateStreamWorkflowDefinition.java | 16 +--
.../stream/DeleteStreamWorkflowDefinition.java | 12 +-
.../stream/RestartStreamWorkflowDefinition.java | 8 +-
.../stream/SuspendStreamWorkflowDefinition.java | 8 +-
.../service/core/plugin/PluginClassLoaderTest.java | 10 +-
.../service/mocks/MockDeleteSortListener.java | 13 ++
.../service/mocks/MockDeleteSourceListener.java | 17 ++-
.../inlong/manager/service/mocks/MockPlugin.java | 67 ++-------
.../service/mocks/MockRestartSortListener.java | 13 ++
.../service/mocks/MockRestartSourceListener.java | 17 ++-
.../service/mocks/MockStopSortListener.java | 13 ++
.../service/mocks/MockStopSourceListener.java | 17 ++-
.../workflow/GroupTaskListenerFactoryTest.java | 2 +-
.../service/workflow/WorkflowServiceImplTest.java | 58 ++++----
.../group/CreateGroupWorkflowDefinitionTest.java | 2 +-
.../manager-service/src/test/resources/log4j2.xml | 46 ++++++
.../manager/workflow/definition/ServiceTask.java | 42 +++---
...tenerProvider.java => TaskListenerFactory.java} | 15 +-
.../manager/workflow/definition/WorkflowTask.java | 2 +-
.../manager/workflow/event/EventSelector.java | 31 ----
.../workflow/event/task/QueueOperateListener.java | 3 +-
.../workflow/event/task/SinkOperateListener.java | 3 +-
.../workflow/event/task/SortOperateListener.java | 3 +-
...ateListener.java => SourceOperateListener.java} | 7 +-
.../workflow/event/task/TaskEventListener.java | 16 ++-
.../manager/workflow/plugin/ProcessPlugin.java | 15 +-
78 files changed, 982 insertions(+), 1411 deletions(-)
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/FlinkSortProcessPlugin.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/FlinkSortProcessPlugin.java
index 45cf2b1e2..896b770d4 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/FlinkSortProcessPlugin.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/FlinkSortProcessPlugin.java
@@ -18,14 +18,6 @@
package org.apache.inlong.manager.plugin;
import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.plugin.eventselect.DeleteProcessSelector;
-import org.apache.inlong.manager.plugin.eventselect.DeleteStreamSelector;
-import org.apache.inlong.manager.plugin.eventselect.RestartProcessSelector;
-import org.apache.inlong.manager.plugin.eventselect.RestartStreamSelector;
-import org.apache.inlong.manager.plugin.eventselect.StartupProcessSelector;
-import org.apache.inlong.manager.plugin.eventselect.StartupStreamSelector;
-import org.apache.inlong.manager.plugin.eventselect.SuspendProcessSelector;
-import org.apache.inlong.manager.plugin.eventselect.SuspendStreamSelector;
import org.apache.inlong.manager.plugin.listener.DeleteSortListener;
import org.apache.inlong.manager.plugin.listener.DeleteStreamListener;
import org.apache.inlong.manager.plugin.listener.RestartSortListener;
@@ -34,13 +26,12 @@ import org.apache.inlong.manager.plugin.listener.StartupSortListener;
import org.apache.inlong.manager.plugin.listener.StartupStreamListener;
import org.apache.inlong.manager.plugin.listener.SuspendSortListener;
import org.apache.inlong.manager.plugin.listener.SuspendStreamListener;
-import org.apache.inlong.manager.workflow.event.EventSelector;
-import org.apache.inlong.manager.workflow.event.task.DataSourceOperateListener;
import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
+import org.apache.inlong.manager.workflow.event.task.SourceOperateListener;
import org.apache.inlong.manager.workflow.plugin.ProcessPlugin;
-import java.util.LinkedHashMap;
-import java.util.Map;
+import java.util.LinkedList;
+import java.util.List;
/**
* Plugin of flink sort process.
@@ -49,21 +40,21 @@ import java.util.Map;
public class FlinkSortProcessPlugin implements ProcessPlugin {
@Override
- public Map<DataSourceOperateListener, EventSelector> createSourceOperateListeners() {
- return new LinkedHashMap<>();
+ public List<SourceOperateListener> createSourceOperateListeners() {
+ return new LinkedList<>();
}
@Override
- public Map<SortOperateListener, EventSelector> createSortOperateListeners() {
- Map<SortOperateListener, EventSelector> listeners = new LinkedHashMap<>();
- listeners.put(new DeleteSortListener(), new DeleteProcessSelector());
- listeners.put(new RestartSortListener(), new RestartProcessSelector());
- listeners.put(new SuspendSortListener(), new SuspendProcessSelector());
- listeners.put(new StartupSortListener(), new StartupProcessSelector());
- listeners.put(new DeleteStreamListener(), new DeleteStreamSelector());
- listeners.put(new RestartStreamListener(), new RestartStreamSelector());
- listeners.put(new SuspendStreamListener(), new SuspendStreamSelector());
- listeners.put(new StartupStreamListener(), new StartupStreamSelector());
+ public List<SortOperateListener> createSortOperateListeners() {
+ List<SortOperateListener> listeners = new LinkedList<>();
+ listeners.add(new DeleteSortListener());
+ listeners.add(new RestartSortListener());
+ listeners.add(new SuspendSortListener());
+ listeners.add(new StartupSortListener());
+ listeners.add(new DeleteStreamListener());
+ listeners.add(new RestartStreamListener());
+ listeners.add(new SuspendStreamListener());
+ listeners.add(new StartupStreamListener());
return listeners;
}
}
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/DeleteProcessSelector.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/DeleteProcessSelector.java
deleted file mode 100644
index d2b9a3dd8..000000000
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/DeleteProcessSelector.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.plugin.eventselect;
-
-import lombok.SneakyThrows;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.enums.GroupOperateType;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
-import org.apache.inlong.manager.workflow.WorkflowContext;
-import org.apache.inlong.manager.workflow.event.EventSelector;
-
-/**
- * Selector of delete process event.
- */
-@Slf4j
-public class DeleteProcessSelector implements EventSelector {
-
- @SneakyThrows
- @Override
- public boolean accept(WorkflowContext context) {
- ProcessForm processForm = context.getProcessForm();
- String groupId = processForm.getInlongGroupId();
- if (!(processForm instanceof GroupResourceProcessForm)) {
- log.info("not add deleteProcess listener, as the form was not GroupResourceProcessForm for groupId [{}]",
- groupId);
- return false;
- }
-
- GroupResourceProcessForm groupResourceProcessForm = (GroupResourceProcessForm) processForm;
- boolean flag = groupResourceProcessForm.getGroupOperateType() == GroupOperateType.DELETE;
- if (!flag) {
- log.info("not add deleteProcess listener, as the operate was not DELETE for groupId [{}]", groupId);
- return false;
- }
-
- log.info("add deleteProcess listener for groupId [{}]", groupId);
- return true;
- }
-}
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/DeleteStreamSelector.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/DeleteStreamSelector.java
deleted file mode 100644
index 35e1543d0..000000000
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/DeleteStreamSelector.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.plugin.eventselect;
-
-import lombok.SneakyThrows;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.enums.GroupOperateType;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.StreamResourceProcessForm;
-import org.apache.inlong.manager.workflow.WorkflowContext;
-import org.apache.inlong.manager.workflow.event.EventSelector;
-
-/**
- * Selector of delete stream event.
- */
-@Slf4j
-public class DeleteStreamSelector implements EventSelector {
-
- @SneakyThrows
- @Override
- public boolean accept(WorkflowContext context) {
- ProcessForm processForm = context.getProcessForm();
- String groupId = processForm.getInlongGroupId();
- if (!(processForm instanceof StreamResourceProcessForm)) {
- log.info("not add deleteStream listener, as the form was not StreamResourceProcessForm for groupId [{}]",
- groupId);
- return false;
- }
-
- StreamResourceProcessForm streamResourceProcessForm = (StreamResourceProcessForm) processForm;
- String streamId = streamResourceProcessForm.getStreamInfo().getInlongStreamId();
- boolean flag = streamResourceProcessForm.getGroupOperateType() == GroupOperateType.DELETE;
- if (!flag) {
- log.info("not add deleteStream listener, as the operate was not DELETE for groupId [{}] and streamId [{}]",
- groupId, streamId);
- return false;
- }
-
- log.info("add deleteStream listener for groupId [{}] and streamId [{}]", groupId, streamId);
- return true;
- }
-}
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/RestartProcessSelector.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/RestartProcessSelector.java
deleted file mode 100644
index fa70bb88c..000000000
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/RestartProcessSelector.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.plugin.eventselect;
-
-import lombok.SneakyThrows;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.enums.GroupOperateType;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
-import org.apache.inlong.manager.workflow.WorkflowContext;
-import org.apache.inlong.manager.workflow.event.EventSelector;
-
-/**
- * Selector of restart process event.
- */
-@Slf4j
-public class RestartProcessSelector implements EventSelector {
-
- @SneakyThrows
- @Override
- public boolean accept(WorkflowContext workflowContext) {
- ProcessForm processForm = workflowContext.getProcessForm();
- String groupId = processForm.getInlongGroupId();
- if (!(processForm instanceof GroupResourceProcessForm)) {
- log.info("not add restartProcess listener, as the form was not GroupResourceProcessForm for groupId [{}]",
- groupId);
- return false;
- }
-
- GroupResourceProcessForm groupProcessForm = (GroupResourceProcessForm) processForm;
- boolean flag = groupProcessForm.getGroupOperateType() == GroupOperateType.RESTART;
- if (!flag) {
- log.info("\"not add restartProcess listener, as the operate was not RESTART for groupId [{}]", groupId);
- return false;
- }
-
- log.info("add restartProcess listener for groupId [{}]", groupId);
- return true;
- }
-}
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/RestartStreamSelector.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/RestartStreamSelector.java
deleted file mode 100644
index 7492dcd46..000000000
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/RestartStreamSelector.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.plugin.eventselect;
-
-import lombok.SneakyThrows;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.enums.GroupOperateType;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.StreamResourceProcessForm;
-import org.apache.inlong.manager.workflow.WorkflowContext;
-import org.apache.inlong.manager.workflow.event.EventSelector;
-
-/**
- * Selector of restart stream event.
- */
-@Slf4j
-public class RestartStreamSelector implements EventSelector {
-
- @SneakyThrows
- @Override
- public boolean accept(WorkflowContext workflowContext) {
- ProcessForm processForm = workflowContext.getProcessForm();
- String groupId = processForm.getInlongGroupId();
- if (!(processForm instanceof StreamResourceProcessForm)) {
- log.info("not add restartStream listener, as the form was not StreamResourceProcessForm for groupId [{}]",
- groupId);
- return false;
- }
-
- StreamResourceProcessForm streamProcessForm = (StreamResourceProcessForm) processForm;
- String streamId = streamProcessForm.getStreamInfo().getInlongStreamId();
- boolean flag = streamProcessForm.getGroupOperateType() == GroupOperateType.RESTART;
- if (!flag) {
- log.info("not add restartStream listener, as the operate was not RESTART for groupId [{}] streamId [{}]",
- groupId, streamId);
- return false;
- }
-
- log.info("add restartStream listener for groupId [{}] and streamId [{}]", groupId, streamId);
- return true;
- }
-}
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/StartupProcessSelector.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/StartupProcessSelector.java
deleted file mode 100644
index 2c4a93bb8..000000000
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/StartupProcessSelector.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.plugin.eventselect;
-
-import lombok.SneakyThrows;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.enums.GroupOperateType;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
-import org.apache.inlong.manager.workflow.WorkflowContext;
-import org.apache.inlong.manager.workflow.event.EventSelector;
-
-/**
- * Selector of startup process event.
- */
-@Slf4j
-public class StartupProcessSelector implements EventSelector {
-
- @SneakyThrows
- @Override
- public boolean accept(WorkflowContext workflowContext) {
- ProcessForm processForm = workflowContext.getProcessForm();
- String groupId = processForm.getInlongGroupId();
- if (!(processForm instanceof GroupResourceProcessForm)) {
- log.info("not add startupProcess listener, as the form was not GroupResourceProcessForm for groupId [{}]",
- groupId);
- return false;
- }
- GroupResourceProcessForm groupProcessForm = (GroupResourceProcessForm) processForm;
- boolean flag = groupProcessForm.getGroupOperateType() == GroupOperateType.INIT;
- if (!flag) {
- log.info("not add startupProcess listener, as the operate was not INIT for groupId [{}]", groupId);
- return false;
- }
-
- log.info("add startupProcess listener for groupId [{}]", groupId);
- return true;
- }
-}
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/StartupStreamSelector.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/StartupStreamSelector.java
deleted file mode 100644
index 20dc1c9c9..000000000
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/StartupStreamSelector.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.plugin.eventselect;
-
-import lombok.SneakyThrows;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.enums.GroupOperateType;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.StreamResourceProcessForm;
-import org.apache.inlong.manager.workflow.WorkflowContext;
-import org.apache.inlong.manager.workflow.event.EventSelector;
-
-/**
- * Selector of startup stream event.
- */
-@Slf4j
-public class StartupStreamSelector implements EventSelector {
-
- @SneakyThrows
- @Override
- public boolean accept(WorkflowContext workflowContext) {
- ProcessForm processForm = workflowContext.getProcessForm();
- String groupId = processForm.getInlongGroupId();
- if (!(processForm instanceof StreamResourceProcessForm)) {
- log.info("not add startupStream listener, as the form was not StreamResourceProcessForm for groupId [{}]",
- groupId);
- return false;
- }
- StreamResourceProcessForm streamProcessForm = (StreamResourceProcessForm) processForm;
- boolean flag = streamProcessForm.getGroupOperateType() == GroupOperateType.INIT;
- String streamId = streamProcessForm.getStreamInfo().getInlongStreamId();
- if (!flag) {
- log.info("not add startupStream listener, as the operate was not INIT for groupId [{}] and streamId [{}]",
- groupId, streamId);
- return false;
- }
- log.info("add startupStream listener for groupId [{}] and streamId [{}]", groupId, streamId);
- return true;
- }
-}
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/SuspendProcessSelector.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/SuspendProcessSelector.java
deleted file mode 100644
index 85b33096d..000000000
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/SuspendProcessSelector.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.plugin.eventselect;
-
-import lombok.SneakyThrows;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.enums.GroupOperateType;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
-import org.apache.inlong.manager.workflow.WorkflowContext;
-import org.apache.inlong.manager.workflow.event.EventSelector;
-
-/**
- * Selector of suspend process event.
- */
-@Slf4j
-public class SuspendProcessSelector implements EventSelector {
-
- @SneakyThrows
- @Override
- public boolean accept(WorkflowContext workflowContext) {
- ProcessForm processForm = workflowContext.getProcessForm();
- String groupId = processForm.getInlongGroupId();
- if (!(processForm instanceof GroupResourceProcessForm)) {
- log.info("not add suspendProcess listener, as the form was not GroupResourceProcessForm for groupId [{}]",
- groupId);
- return false;
- }
-
- GroupResourceProcessForm groupResourceProcessForm = (GroupResourceProcessForm) processForm;
- boolean flag = groupResourceProcessForm.getGroupOperateType() == GroupOperateType.SUSPEND;
- if (!flag) {
- log.info("not add suspendProcess listener, as the operate was not SUSPEND for groupId [{}]", groupId);
- return false;
- }
-
- log.info("add suspendProcess listener for groupId [{}]", groupId);
- return true;
- }
-}
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/SuspendStreamSelector.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/SuspendStreamSelector.java
deleted file mode 100644
index 126f507ee..000000000
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/SuspendStreamSelector.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.plugin.eventselect;
-
-import lombok.SneakyThrows;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.enums.GroupOperateType;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.StreamResourceProcessForm;
-import org.apache.inlong.manager.workflow.WorkflowContext;
-import org.apache.inlong.manager.workflow.event.EventSelector;
-
-/**
- * Selector of startup stream event.
- */
-@Slf4j
-public class SuspendStreamSelector implements EventSelector {
-
- @SneakyThrows
- @Override
- public boolean accept(WorkflowContext workflowContext) {
- ProcessForm processForm = workflowContext.getProcessForm();
- String groupId = processForm.getInlongGroupId();
- if (!(processForm instanceof StreamResourceProcessForm)) {
- log.info("not add suspendStream listener as StreamResourceProcessForm for groupId [{}]", groupId);
- return false;
- }
- StreamResourceProcessForm streamProcessForm = (StreamResourceProcessForm) processForm;
- String streamId = streamProcessForm.getStreamInfo().getInlongStreamId();
- boolean flag = streamProcessForm.getGroupOperateType() == GroupOperateType.SUSPEND;
- if (!flag) {
- log.info("not add suspendStream listener as the operate SUSPEND for groupId [{}] and streamId [{}]",
- groupId, streamId);
- return false;
- }
-
- log.info("add suspendStream listener for groupId [{}] and streamId [{}]", groupId, streamId);
- return true;
- }
-}
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
index c28ddd349..60569c3ca 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
@@ -21,11 +21,12 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.enums.GroupOperateType;
import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
-import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.plugin.flink.FlinkOperation;
import org.apache.inlong.manager.plugin.flink.FlinkService;
import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
@@ -53,6 +54,25 @@ public class DeleteSortListener implements SortOperateListener {
return TaskEvent.COMPLETE;
}
+ @Override
+ public boolean accept(WorkflowContext context) {
+ ProcessForm processForm = context.getProcessForm();
+ String groupId = processForm.getInlongGroupId();
+ if (!(processForm instanceof GroupResourceProcessForm)) {
+ log.info("not add delete group listener, not GroupResourceProcessForm for groupId [{}]", groupId);
+ return false;
+ }
+
+ GroupResourceProcessForm groupProcessForm = (GroupResourceProcessForm) processForm;
+ if (groupProcessForm.getGroupOperateType() != GroupOperateType.DELETE) {
+ log.info("not add delete group listener, as the operate was not DELETE for groupId [{}]", groupId);
+ return false;
+ }
+
+ log.info("add delete group listener for groupId [{}]", groupId);
+ return true;
+ }
+
@Override
public ListenerResult listen(WorkflowContext context) throws Exception {
ProcessForm processForm = context.getProcessForm();
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java
index 188d3bbd7..49350a720 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java
@@ -21,13 +21,14 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.enums.GroupOperateType;
import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamExtInfo;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
import org.apache.inlong.manager.common.pojo.workflow.form.process.StreamResourceProcessForm;
-import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.plugin.flink.FlinkOperation;
import org.apache.inlong.manager.plugin.flink.FlinkService;
import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
@@ -55,6 +56,27 @@ public class DeleteStreamListener implements SortOperateListener {
return TaskEvent.COMPLETE;
}
+ @Override
+ public boolean accept(WorkflowContext context) {
+ ProcessForm processForm = context.getProcessForm();
+ String groupId = processForm.getInlongGroupId();
+ if (!(processForm instanceof StreamResourceProcessForm)) {
+ log.info("not add delete stream listener, not StreamResourceProcessForm for groupId [{}]", groupId);
+ return false;
+ }
+
+ StreamResourceProcessForm streamProcessForm = (StreamResourceProcessForm) processForm;
+ String streamId = streamProcessForm.getStreamInfo().getInlongStreamId();
+ if (streamProcessForm.getGroupOperateType() != GroupOperateType.DELETE) {
+ log.info("not add delete stream listener, as the operate was not DELETE for groupId [{}] streamId [{}]",
+ groupId, streamId);
+ return false;
+ }
+
+ log.info("add delete stream listener for groupId [{}] streamId [{}]", groupId, streamId);
+ return true;
+ }
+
@Override
public ListenerResult listen(WorkflowContext context) throws Exception {
ProcessForm processForm = context.getProcessForm();
@@ -69,7 +91,7 @@ public class DeleteStreamListener implements SortOperateListener {
final String streamId = streamInfo.getInlongStreamId();
Map<String, String> kvConf = groupExtList.stream().collect(
Collectors.toMap(InlongGroupExtInfo::getKeyName, InlongGroupExtInfo::getKeyValue));
- streamExtList.stream().forEach(extInfo -> {
+ streamExtList.forEach(extInfo -> {
kvConf.put(extInfo.getKeyName(), extInfo.getKeyValue());
});
String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
index 6ef6f2a6c..1ba7fdd69 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.enums.GroupOperateType;
import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
@@ -54,6 +55,25 @@ public class RestartSortListener implements SortOperateListener {
return TaskEvent.COMPLETE;
}
+ @Override
+ public boolean accept(WorkflowContext workflowContext) {
+ ProcessForm processForm = workflowContext.getProcessForm();
+ String groupId = processForm.getInlongGroupId();
+ if (!(processForm instanceof GroupResourceProcessForm)) {
+ log.info("not add restart group listener, not GroupResourceProcessForm for groupId [{}]", groupId);
+ return false;
+ }
+
+ GroupResourceProcessForm groupProcessForm = (GroupResourceProcessForm) processForm;
+ if (groupProcessForm.getGroupOperateType() != GroupOperateType.RESTART) {
+ log.info("not add restart group listener, as the operate was not RESTART for groupId [{}]", groupId);
+ return false;
+ }
+
+ log.info("add restart group listener for groupId [{}]", groupId);
+ return true;
+ }
+
@Override
public ListenerResult listen(WorkflowContext context) throws Exception {
ProcessForm processForm = context.getProcessForm();
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java
index 157153e32..cfc76da5e 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.enums.GroupOperateType;
import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamExtInfo;
@@ -56,6 +57,27 @@ public class RestartStreamListener implements SortOperateListener {
return TaskEvent.COMPLETE;
}
+ @Override
+ public boolean accept(WorkflowContext workflowContext) {
+ ProcessForm processForm = workflowContext.getProcessForm();
+ String groupId = processForm.getInlongGroupId();
+ if (!(processForm instanceof StreamResourceProcessForm)) {
+ log.info("not add restart stream listener, not StreamResourceProcessForm for groupId [{}]", groupId);
+ return false;
+ }
+
+ StreamResourceProcessForm streamProcessForm = (StreamResourceProcessForm) processForm;
+ String streamId = streamProcessForm.getStreamInfo().getInlongStreamId();
+ if (streamProcessForm.getGroupOperateType() != GroupOperateType.RESTART) {
+ log.info("not add restart stream listener, as the operate was not RESTART for groupId [{}] streamId [{}]",
+ groupId, streamId);
+ return false;
+ }
+
+ log.info("add restart stream listener for groupId [{}] streamId [{}]", groupId, streamId);
+ return true;
+ }
+
@Override
public ListenerResult listen(WorkflowContext context) throws Exception {
ProcessForm processForm = context.getProcessForm();
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
index 5c80896a7..5fdbde1a2 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.enums.GroupOperateType;
import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
@@ -55,6 +56,24 @@ public class StartupSortListener implements SortOperateListener {
return TaskEvent.COMPLETE;
}
+ @Override
+ public boolean accept(WorkflowContext workflowContext) {
+ ProcessForm processForm = workflowContext.getProcessForm();
+ String groupId = processForm.getInlongGroupId();
+ if (!(processForm instanceof GroupResourceProcessForm)) {
+ log.info("not add startup group listener, not GroupResourceProcessForm for groupId [{}]", groupId);
+ return false;
+ }
+ GroupResourceProcessForm groupProcessForm = (GroupResourceProcessForm) processForm;
+ if (groupProcessForm.getGroupOperateType() != GroupOperateType.INIT) {
+ log.info("not add startup group listener, as the operate was not INIT for groupId [{}]", groupId);
+ return false;
+ }
+
+ log.info("add startup group listener for groupId [{}]", groupId);
+ return true;
+ }
+
@Override
public ListenerResult listen(WorkflowContext context) throws Exception {
ProcessForm processForm = context.getProcessForm();
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupStreamListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupStreamListener.java
index adbbf11ef..40856aa82 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupStreamListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupStreamListener.java
@@ -23,6 +23,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.enums.GroupOperateType;
import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamExtInfo;
@@ -57,6 +58,27 @@ public class StartupStreamListener implements SortOperateListener {
return TaskEvent.COMPLETE;
}
+ @Override
+ public boolean accept(WorkflowContext workflowContext) {
+ ProcessForm processForm = workflowContext.getProcessForm();
+ String groupId = processForm.getInlongGroupId();
+ if (!(processForm instanceof StreamResourceProcessForm)) {
+ log.info("not add startup stream listener, not StreamResourceProcessForm for groupId [{}]", groupId);
+ return false;
+ }
+
+ StreamResourceProcessForm streamProcessForm = (StreamResourceProcessForm) processForm;
+ String streamId = streamProcessForm.getStreamInfo().getInlongStreamId();
+ if (streamProcessForm.getGroupOperateType() != GroupOperateType.INIT) {
+ log.info("not add startup stream listener, as the operate was not INIT for groupId [{}] streamId [{}]",
+ groupId, streamId);
+ return false;
+ }
+
+ log.info("add startup stream listener for groupId [{}] streamId [{}]", groupId, streamId);
+ return true;
+ }
+
@Override
public ListenerResult listen(WorkflowContext context) throws Exception {
ProcessForm processForm = context.getProcessForm();
@@ -77,7 +99,7 @@ public class StartupStreamListener implements SortOperateListener {
Map<String, String> kvConf = groupExtList.stream().collect(
Collectors.toMap(InlongGroupExtInfo::getKeyName, InlongGroupExtInfo::getKeyValue));
- streamExtList.stream().forEach(extInfo -> {
+ streamExtList.forEach(extInfo -> {
kvConf.put(extInfo.getKeyName(), extInfo.getKeyValue());
});
String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
index ddf1a5cc7..c55973694 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
@@ -21,11 +21,12 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.enums.GroupOperateType;
import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
-import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.plugin.flink.FlinkOperation;
import org.apache.inlong.manager.plugin.flink.FlinkService;
import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
@@ -53,6 +54,25 @@ public class SuspendSortListener implements SortOperateListener {
return TaskEvent.COMPLETE;
}
+ @Override
+ public boolean accept(WorkflowContext workflowContext) {
+ ProcessForm processForm = workflowContext.getProcessForm();
+ String groupId = processForm.getInlongGroupId();
+ if (!(processForm instanceof GroupResourceProcessForm)) {
+ log.info("not add suspend group listener, not GroupResourceProcessForm for groupId [{}]", groupId);
+ return false;
+ }
+
+ GroupResourceProcessForm groupProcessForm = (GroupResourceProcessForm) processForm;
+ if (groupProcessForm.getGroupOperateType() != GroupOperateType.SUSPEND) {
+ log.info("not add suspend group listener, as the operate was not SUSPEND for groupId [{}]", groupId);
+ return false;
+ }
+
+ log.info("add suspend group listener for groupId [{}]", groupId);
+ return true;
+ }
+
@Override
public ListenerResult listen(WorkflowContext context) throws Exception {
ProcessForm processForm = context.getProcessForm();
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendStreamListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendStreamListener.java
index a0867c168..19a226b6b 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendStreamListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendStreamListener.java
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.enums.GroupOperateType;
import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamExtInfo;
@@ -55,6 +56,27 @@ public class SuspendStreamListener implements SortOperateListener {
return TaskEvent.COMPLETE;
}
+ @Override
+ public boolean accept(WorkflowContext workflowContext) {
+ ProcessForm processForm = workflowContext.getProcessForm();
+ String groupId = processForm.getInlongGroupId();
+ if (!(processForm instanceof StreamResourceProcessForm)) {
+ log.info("not add suspend stream listener, not StreamResourceProcessForm for groupId [{}]", groupId);
+ return false;
+ }
+
+ StreamResourceProcessForm streamProcessForm = (StreamResourceProcessForm) processForm;
+ String streamId = streamProcessForm.getStreamInfo().getInlongStreamId();
+ if (streamProcessForm.getGroupOperateType() != GroupOperateType.SUSPEND) {
+ log.info("not add suspend stream listener as the operate SUSPEND for groupId [{}] streamId [{}]",
+ groupId, streamId);
+ return false;
+ }
+
+ log.info("add suspend stream listener for groupId [{}] streamId [{}]", groupId, streamId);
+ return true;
+ }
+
@Override
public ListenerResult listen(WorkflowContext context) throws Exception {
ProcessForm processForm = context.getProcessForm();
diff --git a/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/StartupSortListenerTest.java b/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/StartupSortListenerTest.java
index c47f5f71c..e79171afc 100644
--- a/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/StartupSortListenerTest.java
+++ b/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/StartupSortListenerTest.java
@@ -37,14 +37,12 @@ public class StartupSortListenerTest {
@Test
public void testListener() throws Exception {
-
WorkflowContext context = new WorkflowContext();
- GroupResourceProcessForm groupResourceProcessForm = new GroupResourceProcessForm();
-
- context.setProcessForm(groupResourceProcessForm);
+ GroupResourceProcessForm groupResourceForm = new GroupResourceProcessForm();
+ context.setProcessForm(groupResourceForm);
InlongPulsarInfo pulsarInfo = new InlongPulsarInfo();
pulsarInfo.setInlongGroupId("1");
- groupResourceProcessForm.setGroupInfo(pulsarInfo);
+ groupResourceForm.setGroupInfo(pulsarInfo);
List<InlongGroupExtInfo> inlongGroupExtInfos = new ArrayList<>();
InlongGroupExtInfo inlongGroupExtInfo1 = new InlongGroupExtInfo();
diff --git a/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/SuspendSortListenerTest.java b/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/SuspendSortListenerTest.java
index 087481ece..7d4f18cd7 100644
--- a/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/SuspendSortListenerTest.java
+++ b/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/SuspendSortListenerTest.java
@@ -38,11 +38,11 @@ public class SuspendSortListenerTest {
@Test
public void testListener() throws Exception {
WorkflowContext context = new WorkflowContext();
- GroupResourceProcessForm groupResourceProcessForm = new GroupResourceProcessForm();
- context.setProcessForm(groupResourceProcessForm);
+ GroupResourceProcessForm groupResourceForm = new GroupResourceProcessForm();
+ context.setProcessForm(groupResourceForm);
InlongPulsarInfo pulsarInfo = new InlongPulsarInfo();
pulsarInfo.setInlongGroupId("1");
- groupResourceProcessForm.setGroupInfo(pulsarInfo);
+ groupResourceForm.setGroupInfo(pulsarInfo);
InlongGroupExtInfo inlongGroupExtInfo1 = new InlongGroupExtInfo();
inlongGroupExtInfo1.setKeyName(InlongConstants.SORT_URL);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarGroupTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarGroupTaskListener.java
index 6adc8fcea..e04db57ea 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarGroupTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarGroupTaskListener.java
@@ -19,15 +19,19 @@ package org.apache.inlong.manager.service.mq;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ClusterType;
+import org.apache.inlong.manager.common.enums.GroupOperateType;
+import org.apache.inlong.manager.common.enums.MQType;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.common.pojo.cluster.ClusterInfo;
import org.apache.inlong.manager.common.pojo.cluster.pulsar.PulsarClusterInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.common.pojo.group.pulsar.InlongPulsarInfo;
import org.apache.inlong.manager.common.pojo.pulsar.PulsarTopicBean;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamBriefInfo;
import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
-import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
import org.apache.inlong.manager.service.cluster.InlongClusterService;
import org.apache.inlong.manager.service.core.ConsumptionService;
import org.apache.inlong.manager.service.core.InlongStreamService;
@@ -67,6 +71,36 @@ public class CreatePulsarGroupTaskListener implements QueueOperateListener {
return TaskEvent.COMPLETE;
}
+ @Override
+ public boolean accept(WorkflowContext context) {
+ ProcessForm processForm = context.getProcessForm();
+ if (!(processForm instanceof GroupResourceProcessForm)) {
+ return false;
+ }
+ GroupResourceProcessForm form = (GroupResourceProcessForm) processForm;
+ GroupOperateType operateType = form.getGroupOperateType();
+ if (operateType != GroupOperateType.INIT) {
+ return false;
+ }
+ String groupId = form.getInlongGroupId();
+ InlongGroupInfo groupInfo = form.getGroupInfo();
+ MQType mqType = MQType.forType(groupInfo.getMqType());
+ if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
+ InlongPulsarInfo pulsarInfo = (InlongPulsarInfo) groupInfo;
+ boolean enable = InlongConstants.ENABLE_CREATE_RESOURCE.equals(pulsarInfo.getEnableCreateResource());
+ if (enable) {
+ log.info("need to create pulsar resource as the createResource was true for groupId [{}]", groupId);
+ return true;
+ } else {
+ log.info("skip to create pulsar resource as the createResource was false for groupId [{}]", groupId);
+ return false;
+ }
+ }
+
+ log.warn("skip to create pulsar subscription as the mq type is {} for groupId [{}]", mqType, groupId);
+ return false;
+ }
+
@Override
public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
GroupResourceProcessForm form = (GroupResourceProcessForm) context.getProcessForm();
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarResourceTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarResourceTaskListener.java
index ad05d1e87..41a97b2c2 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarResourceTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarResourceTaskListener.java
@@ -19,7 +19,10 @@ package org.apache.inlong.manager.service.mq;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ClusterType;
+import org.apache.inlong.manager.common.enums.GroupOperateType;
+import org.apache.inlong.manager.common.enums.MQType;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.common.pojo.cluster.ClusterInfo;
import org.apache.inlong.manager.common.pojo.cluster.pulsar.PulsarClusterInfo;
@@ -28,7 +31,7 @@ import org.apache.inlong.manager.common.pojo.group.pulsar.InlongPulsarInfo;
import org.apache.inlong.manager.common.pojo.pulsar.PulsarTopicBean;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamBriefInfo;
import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
-import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.service.cluster.InlongClusterService;
import org.apache.inlong.manager.service.core.InlongStreamService;
@@ -66,6 +69,36 @@ public class CreatePulsarResourceTaskListener implements QueueOperateListener {
return TaskEvent.COMPLETE;
}
+ @Override
+ public boolean accept(WorkflowContext context) {
+ ProcessForm processForm = context.getProcessForm();
+ if (!(processForm instanceof GroupResourceProcessForm)) {
+ return false;
+ }
+ GroupResourceProcessForm form = (GroupResourceProcessForm) processForm;
+ GroupOperateType operateType = form.getGroupOperateType();
+ if (operateType != GroupOperateType.INIT) {
+ return false;
+ }
+ String groupId = form.getInlongGroupId();
+ InlongGroupInfo groupInfo = form.getGroupInfo();
+ MQType mqType = MQType.forType(groupInfo.getMqType());
+ if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
+ InlongPulsarInfo pulsarInfo = (InlongPulsarInfo) groupInfo;
+ boolean enable = InlongConstants.ENABLE_CREATE_RESOURCE.equals(pulsarInfo.getEnableCreateResource());
+ if (enable) {
+ log.info("need to create pulsar resource as the createResource was true for groupId [{}]", groupId);
+ return true;
+ } else {
+ log.info("skip to create pulsar resource as the createResource was false for groupId [{}]", groupId);
+ return false;
+ }
+ }
+
+ log.warn("skip to create pulsar subscription as the mq type is {} for groupId [{}]", mqType, groupId);
+ return false;
+ }
+
@Override
public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
GroupResourceProcessForm form = (GroupResourceProcessForm) context.getProcessForm();
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarSubscriptionTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarSubscriptionTaskListener.java
index 262076451..7cbe50fd6 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarSubscriptionTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarSubscriptionTaskListener.java
@@ -18,14 +18,19 @@
package org.apache.inlong.manager.service.mq;
import lombok.extern.slf4j.Slf4j;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ClusterType;
+import org.apache.inlong.manager.common.enums.GroupOperateType;
+import org.apache.inlong.manager.common.enums.MQType;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.common.pojo.cluster.ClusterInfo;
import org.apache.inlong.manager.common.pojo.cluster.pulsar.PulsarClusterInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.common.pojo.group.pulsar.InlongPulsarInfo;
import org.apache.inlong.manager.common.pojo.pulsar.PulsarTopicBean;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
import org.apache.inlong.manager.common.pojo.workflow.form.process.StreamResourceProcessForm;
import org.apache.inlong.manager.service.cluster.InlongClusterService;
import org.apache.inlong.manager.service.core.ConsumptionService;
@@ -63,6 +68,38 @@ public class CreatePulsarSubscriptionTaskListener implements QueueOperateListene
return TaskEvent.COMPLETE;
}
+ @Override
+ public boolean accept(WorkflowContext context) {
+ ProcessForm processForm = context.getProcessForm();
+ if (!(processForm instanceof StreamResourceProcessForm)) {
+ return false;
+ }
+ StreamResourceProcessForm streamResourceProcessForm = (StreamResourceProcessForm) processForm;
+ GroupOperateType operateType = streamResourceProcessForm.getGroupOperateType();
+ if (operateType != GroupOperateType.INIT) {
+ return false;
+ }
+
+ InlongGroupInfo groupInfo = streamResourceProcessForm.getGroupInfo();
+ MQType mqType = MQType.forType(groupInfo.getMqType());
+ String groupId = groupInfo.getInlongGroupId();
+ String streamId = streamResourceProcessForm.getStreamInfo().getInlongStreamId();
+ if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
+ InlongPulsarInfo pulsarInfo = (InlongPulsarInfo) groupInfo;
+ boolean enable = InlongConstants.ENABLE_CREATE_RESOURCE.equals(pulsarInfo.getEnableCreateResource());
+ if (enable) {
+ log.info("need to create pulsar topic as the createResource was true for groupId [{}] streamId [{}]",
+ groupId, streamId);
+ return true;
+ } else {
+ log.info("skip to create pulsar topic as the createResource was false for groupId [{}] streamId [{}]",
+ groupId, streamId);
+ return false;
+ }
+ }
+ return false;
+ }
+
@Override
public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
StreamResourceProcessForm form = (StreamResourceProcessForm) context.getProcessForm();
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarTopicTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarTopicTaskListener.java
index 1d18994ee..6ecafd029 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarTopicTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarTopicTaskListener.java
@@ -21,6 +21,8 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ClusterType;
+import org.apache.inlong.manager.common.enums.GroupOperateType;
+import org.apache.inlong.manager.common.enums.MQType;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.common.pojo.cluster.ClusterInfo;
import org.apache.inlong.manager.common.pojo.cluster.pulsar.PulsarClusterInfo;
@@ -28,6 +30,7 @@ import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.group.pulsar.InlongPulsarInfo;
import org.apache.inlong.manager.common.pojo.pulsar.PulsarTopicBean;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
import org.apache.inlong.manager.common.pojo.workflow.form.process.StreamResourceProcessForm;
import org.apache.inlong.manager.service.cluster.InlongClusterService;
import org.apache.inlong.manager.service.mq.util.PulsarOperator;
@@ -57,6 +60,38 @@ public class CreatePulsarTopicTaskListener implements QueueOperateListener {
return TaskEvent.COMPLETE;
}
+ @Override
+ public boolean accept(WorkflowContext context) {
+ ProcessForm processForm = context.getProcessForm();
+ if (!(processForm instanceof StreamResourceProcessForm)) {
+ return false;
+ }
+ StreamResourceProcessForm streamResourceProcessForm = (StreamResourceProcessForm) processForm;
+ GroupOperateType operateType = streamResourceProcessForm.getGroupOperateType();
+ if (operateType != GroupOperateType.INIT) {
+ return false;
+ }
+
+ InlongGroupInfo groupInfo = streamResourceProcessForm.getGroupInfo();
+ MQType mqType = MQType.forType(groupInfo.getMqType());
+ String groupId = groupInfo.getInlongGroupId();
+ String streamId = streamResourceProcessForm.getStreamInfo().getInlongStreamId();
+ if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
+ InlongPulsarInfo pulsarInfo = (InlongPulsarInfo) groupInfo;
+ boolean enable = InlongConstants.ENABLE_CREATE_RESOURCE.equals(pulsarInfo.getEnableCreateResource());
+ if (enable) {
+ log.info("need to create pulsar topic as the createResource was true for groupId [{}] streamId [{}]",
+ groupId, streamId);
+ return true;
+ } else {
+ log.info("skip to create pulsar topic as the createResource was false for groupId [{}] streamId [{}]",
+ groupId, streamId);
+ return false;
+ }
+ }
+ return false;
+ }
+
@Override
public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
StreamResourceProcessForm form = (StreamResourceProcessForm) context.getProcessForm();
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreateTubeGroupTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreateTubeGroupTaskListener.java
index 437005eae..187fdb845 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreateTubeGroupTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreateTubeGroupTaskListener.java
@@ -19,9 +19,11 @@ package org.apache.inlong.manager.service.mq;
import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.manager.common.enums.ClusterType;
+import org.apache.inlong.manager.common.enums.MQType;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.common.pojo.cluster.tube.TubeClusterInfo;
import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
import org.apache.inlong.manager.service.cluster.InlongClusterService;
@@ -52,6 +54,24 @@ public class CreateTubeGroupTaskListener implements QueueOperateListener {
return TaskEvent.COMPLETE;
}
+ @Override
+ public boolean accept(WorkflowContext context) {
+ ProcessForm processForm = context.getProcessForm();
+ if (!(processForm instanceof GroupResourceProcessForm)) {
+ return false;
+ }
+ GroupResourceProcessForm form = (GroupResourceProcessForm) processForm;
+ String groupId = form.getInlongGroupId();
+ MQType mqType = MQType.forType(form.getGroupInfo().getMqType());
+ if (mqType == MQType.TUBE) {
+ log.info("need to create tube resource for groupId [{}]", groupId);
+ return true;
+ }
+
+ log.warn("skip to to create tube resource as the mq type is {} for groupId [{}]", mqType, groupId);
+ return false;
+ }
+
@Override
public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
GroupResourceProcessForm form = (GroupResourceProcessForm) context.getProcessForm();
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreateTubeTopicTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreateTubeTopicTaskListener.java
index 4c422c791..f4d6dd204 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreateTubeTopicTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreateTubeTopicTaskListener.java
@@ -19,11 +19,13 @@ package org.apache.inlong.manager.service.mq;
import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.manager.common.enums.ClusterType;
+import org.apache.inlong.manager.common.enums.MQType;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.common.pojo.cluster.ClusterInfo;
import org.apache.inlong.manager.common.pojo.cluster.tube.TubeClusterInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
import org.apache.inlong.manager.service.cluster.InlongClusterService;
import org.apache.inlong.manager.service.mq.util.TubeMQOperator;
import org.apache.inlong.manager.workflow.WorkflowContext;
@@ -50,6 +52,24 @@ public class CreateTubeTopicTaskListener implements QueueOperateListener {
return TaskEvent.COMPLETE;
}
+ @Override
+ public boolean accept(WorkflowContext context) {
+ ProcessForm processForm = context.getProcessForm();
+ if (!(processForm instanceof GroupResourceProcessForm)) {
+ return false;
+ }
+ GroupResourceProcessForm form = (GroupResourceProcessForm) processForm;
+ String groupId = form.getInlongGroupId();
+ MQType mqType = MQType.forType(form.getGroupInfo().getMqType());
+ if (mqType == MQType.TUBE) {
+ log.info("need to create tube resource for groupId [{}]", groupId);
+ return true;
+ }
+
+ log.warn("skip to to create tube resource as the mq type is {} for groupId [{}]", mqType, groupId);
+ return false;
+ }
+
@Override
public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
GroupResourceProcessForm form = (GroupResourceProcessForm) context.getProcessForm();
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/DeletePulsarResourceTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/DeletePulsarResourceTaskListener.java
index f63afe0e9..978d75fbd 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/DeletePulsarResourceTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/DeletePulsarResourceTaskListener.java
@@ -18,7 +18,10 @@
package org.apache.inlong.manager.service.mq;
import lombok.extern.slf4j.Slf4j;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ClusterType;
+import org.apache.inlong.manager.common.enums.GroupOperateType;
+import org.apache.inlong.manager.common.enums.MQType;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.common.pojo.cluster.ClusterInfo;
import org.apache.inlong.manager.common.pojo.cluster.pulsar.PulsarClusterInfo;
@@ -27,6 +30,7 @@ import org.apache.inlong.manager.common.pojo.group.pulsar.InlongPulsarInfo;
import org.apache.inlong.manager.common.pojo.pulsar.PulsarTopicBean;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamBriefInfo;
import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.service.cluster.InlongClusterService;
import org.apache.inlong.manager.service.core.InlongStreamService;
@@ -64,6 +68,36 @@ public class DeletePulsarResourceTaskListener implements QueueOperateListener {
return TaskEvent.COMPLETE;
}
+ @Override
+ public boolean accept(WorkflowContext context) {
+ ProcessForm processForm = context.getProcessForm();
+ if (!(processForm instanceof GroupResourceProcessForm)) {
+ return false;
+ }
+ GroupResourceProcessForm form = (GroupResourceProcessForm) processForm;
+ GroupOperateType operateType = form.getGroupOperateType();
+ if (operateType != GroupOperateType.DELETE) {
+ return false;
+ }
+ String groupId = form.getInlongGroupId();
+ InlongGroupInfo groupInfo = form.getGroupInfo();
+ MQType mqType = MQType.forType(groupInfo.getMqType());
+ if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
+ InlongPulsarInfo pulsarInfo = (InlongPulsarInfo) groupInfo;
+ boolean enable = InlongConstants.ENABLE_CREATE_RESOURCE.equals(pulsarInfo.getEnableCreateResource());
+ if (enable) {
+ log.info("need to delete pulsar resource as the createResource was true for groupId [{}]", groupId);
+ return true;
+ } else {
+ log.info("skip to delete pulsar resource as the createResource was false for groupId [{}]", groupId);
+ return false;
+ }
+ }
+
+ log.warn("skip to delete pulsar subscription as the mq type is {} for groupId [{}]", mqType, groupId);
+ return false;
+ }
+
@Override
public ListenerResult listen(WorkflowContext context) throws Exception {
GroupResourceProcessForm form = (GroupResourceProcessForm) context.getProcessForm();
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/DeletePulsarTopicTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/DeletePulsarTopicTaskListener.java
index 9530d679b..4b77a86af 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/DeletePulsarTopicTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/DeletePulsarTopicTaskListener.java
@@ -21,6 +21,8 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ClusterType;
+import org.apache.inlong.manager.common.enums.GroupOperateType;
+import org.apache.inlong.manager.common.enums.MQType;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.common.pojo.cluster.ClusterInfo;
import org.apache.inlong.manager.common.pojo.cluster.pulsar.PulsarClusterInfo;
@@ -28,6 +30,7 @@ import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.group.pulsar.InlongPulsarInfo;
import org.apache.inlong.manager.common.pojo.pulsar.PulsarTopicBean;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
import org.apache.inlong.manager.common.pojo.workflow.form.process.StreamResourceProcessForm;
import org.apache.inlong.manager.service.cluster.InlongClusterService;
import org.apache.inlong.manager.service.mq.util.PulsarOperator;
@@ -57,6 +60,38 @@ public class DeletePulsarTopicTaskListener implements QueueOperateListener {
return TaskEvent.COMPLETE;
}
+ @Override
+ public boolean accept(WorkflowContext context) {
+ ProcessForm processForm = context.getProcessForm();
+ if (!(processForm instanceof StreamResourceProcessForm)) {
+ return false;
+ }
+ StreamResourceProcessForm streamResourceProcessForm = (StreamResourceProcessForm) processForm;
+ GroupOperateType operateType = streamResourceProcessForm.getGroupOperateType();
+ if (operateType != GroupOperateType.DELETE) {
+ return false;
+ }
+
+ InlongGroupInfo groupInfo = streamResourceProcessForm.getGroupInfo();
+ MQType mqType = MQType.forType(groupInfo.getMqType());
+ String groupId = groupInfo.getInlongGroupId();
+ String streamId = streamResourceProcessForm.getStreamInfo().getInlongStreamId();
+ if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
+ InlongPulsarInfo pulsarInfo = (InlongPulsarInfo) groupInfo;
+ boolean enable = InlongConstants.ENABLE_CREATE_RESOURCE.equals(pulsarInfo.getEnableCreateResource());
+ if (enable) {
+ log.info("need to delete pulsar topic as the createResource was true for groupId [{}] streamId [{}]",
+ groupId, streamId);
+ return true;
+ } else {
+ log.info("skip to delete pulsar topic as the createResource was false for groupId [{}] streamId [{}]",
+ groupId, streamId);
+ return false;
+ }
+ }
+ return false;
+ }
+
@Override
public ListenerResult listen(WorkflowContext context) throws Exception {
StreamResourceProcessForm form = (StreamResourceProcessForm) context.getProcessForm();
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/PulsarResourceCreateSelector.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/PulsarResourceCreateSelector.java
deleted file mode 100644
index 8c2015fbe..000000000
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/PulsarResourceCreateSelector.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.mq;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.consts.InlongConstants;
-import org.apache.inlong.manager.common.enums.GroupOperateType;
-import org.apache.inlong.manager.common.enums.MQType;
-import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
-import org.apache.inlong.manager.common.pojo.group.pulsar.InlongPulsarInfo;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
-import org.apache.inlong.manager.workflow.WorkflowContext;
-import org.apache.inlong.manager.workflow.event.EventSelector;
-
-/**
- * Selector of pulsar event for creating pulsar resource.
- */
-@Slf4j
-public class PulsarResourceCreateSelector implements EventSelector {
-
- @Override
- public boolean accept(WorkflowContext context) {
- ProcessForm processForm = context.getProcessForm();
- if (!(processForm instanceof GroupResourceProcessForm)) {
- return false;
- }
- GroupResourceProcessForm form = (GroupResourceProcessForm) processForm;
- GroupOperateType operateType = form.getGroupOperateType();
- if (operateType != GroupOperateType.INIT) {
- return false;
- }
- String groupId = form.getInlongGroupId();
- InlongGroupInfo groupInfo = form.getGroupInfo();
- MQType mqType = MQType.forType(groupInfo.getMqType());
- if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
- InlongPulsarInfo pulsarInfo = (InlongPulsarInfo) groupInfo;
- boolean enable = InlongConstants.ENABLE_CREATE_RESOURCE.equals(pulsarInfo.getEnableCreateResource());
- if (enable) {
- log.info("need to create pulsar resource as the createResource was true for groupId [{}]", groupId);
- return true;
- } else {
- log.info("skip to create pulsar resource as the createResource was false for groupId [{}]", groupId);
- return false;
- }
- }
-
- log.warn("skip to create pulsar subscription as the mq type is {} for groupId [{}]", mqType, groupId);
- return false;
- }
-
-}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/PulsarResourceDeleteSelector.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/PulsarResourceDeleteSelector.java
deleted file mode 100644
index 8ceabd306..000000000
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/PulsarResourceDeleteSelector.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.mq;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.consts.InlongConstants;
-import org.apache.inlong.manager.common.enums.GroupOperateType;
-import org.apache.inlong.manager.common.enums.MQType;
-import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
-import org.apache.inlong.manager.common.pojo.group.pulsar.InlongPulsarInfo;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
-import org.apache.inlong.manager.workflow.WorkflowContext;
-import org.apache.inlong.manager.workflow.event.EventSelector;
-
-/**
- * Selector of pulsar event for deleting pulsar resource
- */
-@Slf4j
-public class PulsarResourceDeleteSelector implements EventSelector {
-
- @Override
- public boolean accept(WorkflowContext context) {
- ProcessForm processForm = context.getProcessForm();
- if (!(processForm instanceof GroupResourceProcessForm)) {
- return false;
- }
- GroupResourceProcessForm form = (GroupResourceProcessForm) processForm;
- GroupOperateType operateType = form.getGroupOperateType();
- if (operateType != GroupOperateType.DELETE) {
- return false;
- }
- String groupId = form.getInlongGroupId();
- InlongGroupInfo groupInfo = form.getGroupInfo();
- MQType mqType = MQType.forType(groupInfo.getMqType());
- if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
- InlongPulsarInfo pulsarInfo = (InlongPulsarInfo) groupInfo;
- boolean enable = InlongConstants.ENABLE_CREATE_RESOURCE.equals(pulsarInfo.getEnableCreateResource());
- if (enable) {
- log.info("need to delete pulsar resource as the createResource was true for groupId [{}]", groupId);
- return true;
- } else {
- log.info("skip to delete pulsar resource as the createResource was false for groupId [{}]", groupId);
- return false;
- }
- }
-
- log.warn("skip to delete pulsar subscription as the mq type is {} for groupId [{}]", mqType, groupId);
- return false;
- }
-}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/PulsarTopicCreateSelector.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/PulsarTopicCreateSelector.java
deleted file mode 100644
index e27a721a6..000000000
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/PulsarTopicCreateSelector.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.mq;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.consts.InlongConstants;
-import org.apache.inlong.manager.common.enums.GroupOperateType;
-import org.apache.inlong.manager.common.enums.MQType;
-import org.apache.inlong.manager.common.pojo.group.pulsar.InlongPulsarInfo;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.StreamResourceProcessForm;
-import org.apache.inlong.manager.workflow.WorkflowContext;
-import org.apache.inlong.manager.workflow.event.EventSelector;
-
-@Slf4j
-public class PulsarTopicCreateSelector implements EventSelector {
-
- @Override
- public boolean accept(WorkflowContext context) {
- ProcessForm processForm = context.getProcessForm();
- if (!(processForm instanceof StreamResourceProcessForm)) {
- return false;
- }
- StreamResourceProcessForm streamResourceProcessForm = (StreamResourceProcessForm) processForm;
- GroupOperateType operateType = streamResourceProcessForm.getGroupOperateType();
- if (operateType != GroupOperateType.INIT) {
- return false;
- }
- MQType mqType = MQType.forType(streamResourceProcessForm.getGroupInfo().getMqType());
- String groupId = streamResourceProcessForm.getGroupInfo().getInlongGroupId();
- String streamId = streamResourceProcessForm.getStreamInfo().getInlongStreamId();
- if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
- InlongPulsarInfo pulsarInfo = (InlongPulsarInfo) streamResourceProcessForm.getGroupInfo();
- boolean enable = InlongConstants.ENABLE_CREATE_RESOURCE.equals(pulsarInfo.getEnableCreateResource());
- if (enable) {
- log.info("need to create pulsar topic as the createResource was true for groupId [{}] streamId [{}]",
- groupId, streamId);
- return true;
- } else {
- log.info("skip to create pulsar topic as the createResource was false for groupId [{}] streamId [{}]",
- groupId, streamId);
- return false;
- }
- }
- return false;
-
- }
-}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/PulsarTopicDeleteSelector.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/PulsarTopicDeleteSelector.java
deleted file mode 100644
index 43c1d22bb..000000000
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/PulsarTopicDeleteSelector.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.mq;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.consts.InlongConstants;
-import org.apache.inlong.manager.common.enums.GroupOperateType;
-import org.apache.inlong.manager.common.enums.MQType;
-import org.apache.inlong.manager.common.pojo.group.pulsar.InlongPulsarInfo;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.StreamResourceProcessForm;
-import org.apache.inlong.manager.workflow.WorkflowContext;
-import org.apache.inlong.manager.workflow.event.EventSelector;
-
-@Slf4j
-public class PulsarTopicDeleteSelector implements EventSelector {
-
- @Override
- public boolean accept(WorkflowContext context) {
- ProcessForm processForm = context.getProcessForm();
- if (!(processForm instanceof StreamResourceProcessForm)) {
- return false;
- }
- StreamResourceProcessForm streamResourceProcessForm = (StreamResourceProcessForm) processForm;
- GroupOperateType operateType = streamResourceProcessForm.getGroupOperateType();
- if (operateType != GroupOperateType.DELETE) {
- return false;
- }
- MQType mqType = MQType.forType(streamResourceProcessForm.getGroupInfo().getMqType());
- String groupId = streamResourceProcessForm.getGroupInfo().getInlongGroupId();
- String streamId = streamResourceProcessForm.getStreamInfo().getInlongStreamId();
- if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
- InlongPulsarInfo pulsarInfo = (InlongPulsarInfo) streamResourceProcessForm.getGroupInfo();
- boolean enable = InlongConstants.ENABLE_CREATE_RESOURCE.equals(pulsarInfo.getEnableCreateResource());
- if (enable) {
- log.info("need to delete pulsar topic as the createResource was true for groupId [{}] streamId [{}]",
- groupId, streamId);
- return true;
- } else {
- log.info("skip to delete pulsar topic as the createResource was false for groupId [{}] streamId [{}]",
- groupId, streamId);
- return false;
- }
- }
- return false;
- }
-}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/TubeEventSelector.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/TubeEventSelector.java
deleted file mode 100644
index 410a344fd..000000000
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/TubeEventSelector.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.mq;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.enums.MQType;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
-import org.apache.inlong.manager.workflow.WorkflowContext;
-import org.apache.inlong.manager.workflow.event.EventSelector;
-
-/**
- * Selector of tube event for create tube source.
- */
-@Slf4j
-public class TubeEventSelector implements EventSelector {
-
- @Override
- public boolean accept(WorkflowContext context) {
- ProcessForm processForm = context.getProcessForm();
- if (!(processForm instanceof GroupResourceProcessForm)) {
- return false;
- }
- GroupResourceProcessForm form = (GroupResourceProcessForm) processForm;
- String groupId = form.getInlongGroupId();
- MQType mqType = MQType.forType(form.getGroupInfo().getMqType());
- if (mqType == MQType.TUBE) {
- log.info("need to create tube resource for groupId [{}]", groupId);
- return true;
- }
-
- log.warn("skip to to create tube resource as the mq type is {} for groupId [{}]", mqType, groupId);
- return false;
- }
-}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/StreamSinkResourceListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/StreamSinkResourceListener.java
index 242c3bf2a..7d9ebe220 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/StreamSinkResourceListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/StreamSinkResourceListener.java
@@ -24,6 +24,7 @@ import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.SinkType;
import org.apache.inlong.manager.common.pojo.sink.SinkInfo;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
import org.apache.inlong.manager.common.pojo.workflow.form.process.StreamResourceProcessForm;
import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
import org.apache.inlong.manager.workflow.WorkflowContext;
@@ -54,6 +55,12 @@ public class StreamSinkResourceListener implements SinkOperateListener {
return TaskEvent.COMPLETE;
}
+ @Override
+ public boolean accept(WorkflowContext context) {
+ ProcessForm processForm = context.getProcessForm();
+ return processForm instanceof StreamResourceProcessForm;
+ }
+
@Override
public ListenerResult listen(WorkflowContext context) {
StreamResourceProcessForm form = (StreamResourceProcessForm) context.getProcessForm();
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/SortConfigListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/SortConfigListener.java
index 14d8f3258..b97d13e72 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/SortConfigListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/SortConfigListener.java
@@ -17,11 +17,15 @@
package org.apache.inlong.manager.service.sort;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.GroupOperateType;
+import org.apache.inlong.manager.common.enums.MQType;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.StreamResourceProcessForm;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
@@ -50,6 +54,33 @@ public class SortConfigListener implements SortOperateListener {
return TaskEvent.COMPLETE;
}
+ @Override
+ public boolean accept(WorkflowContext context) {
+ ProcessForm processForm = context.getProcessForm();
+ String groupId = processForm.getInlongGroupId();
+ if (processForm instanceof GroupResourceProcessForm) {
+ GroupResourceProcessForm groupResourceForm = (GroupResourceProcessForm) processForm;
+ InlongGroupInfo groupInfo = groupResourceForm.getGroupInfo();
+ boolean enable = InlongConstants.DISABLE_ZK.equals(groupInfo.getEnableZookeeper())
+ && MQType.forType(groupInfo.getMqType()) != MQType.NONE;
+
+ LOGGER.info("zookeeper disabled was [{}] for groupId [{}]", enable, groupId);
+ return enable;
+ } else if (processForm instanceof StreamResourceProcessForm) {
+ StreamResourceProcessForm streamResourceForm = (StreamResourceProcessForm) processForm;
+ InlongGroupInfo groupInfo = streamResourceForm.getGroupInfo();
+ InlongStreamInfo streamInfo = streamResourceForm.getStreamInfo();
+ boolean enable = InlongConstants.DISABLE_ZK.equals(groupInfo.getEnableZookeeper())
+ && MQType.forType(groupInfo.getMqType()) != MQType.NONE;
+ LOGGER.info("zookeeper disabled was [{}] for groupId [{}] and streamId [{}] ", enable, groupId,
+ streamInfo.getInlongStreamId());
+ return enable;
+ } else {
+ LOGGER.info("zk disabled for groupId [{}]", groupId);
+ return false;
+ }
+ }
+
@Override
public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
GroupResourceProcessForm form = (GroupResourceProcessForm) context.getProcessForm();
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/StreamSortConfigListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/StreamSortConfigListener.java
index dda1086fc..6f4293a65 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/StreamSortConfigListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/StreamSortConfigListener.java
@@ -18,11 +18,15 @@
package org.apache.inlong.manager.service.sort;
import org.apache.commons.collections.CollectionUtils;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.GroupOperateType;
+import org.apache.inlong.manager.common.enums.MQType;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.sink.StreamSink;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
import org.apache.inlong.manager.common.pojo.workflow.form.process.StreamResourceProcessForm;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
@@ -53,6 +57,23 @@ public class StreamSortConfigListener implements SortOperateListener {
return TaskEvent.COMPLETE;
}
+ @Override
+ public boolean accept(WorkflowContext context) {
+ ProcessForm processForm = context.getProcessForm();
+ String groupId = processForm.getInlongGroupId();
+ if (!(processForm instanceof GroupResourceProcessForm)) {
+ LOGGER.warn("zookeeper enabled was [false] for groupId [{}]", groupId);
+ return false;
+ }
+
+ GroupResourceProcessForm groupResourceForm = (GroupResourceProcessForm) processForm;
+ InlongGroupInfo groupInfo = groupResourceForm.getGroupInfo();
+ boolean enable = InlongConstants.ENABLE_ZK.equals(groupInfo.getEnableZookeeper())
+ && MQType.forType(groupInfo.getMqType()) != MQType.NONE;
+ LOGGER.info("zookeeper enabled was [{}] for groupId [{}]", enable, groupId);
+ return enable;
+ }
+
@Override
public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
StreamResourceProcessForm form = (StreamResourceProcessForm) context.getProcessForm();
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/ZookeeperDisabledSelector.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/ZookeeperDisabledSelector.java
deleted file mode 100644
index cac93aa5f..000000000
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/ZookeeperDisabledSelector.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.sort;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.consts.InlongConstants;
-import org.apache.inlong.manager.common.enums.MQType;
-import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.StreamResourceProcessForm;
-import org.apache.inlong.manager.workflow.WorkflowContext;
-import org.apache.inlong.manager.workflow.event.EventSelector;
-
-/**
- * Event selector for whether ZooKeeper is disabled.
- */
-@Slf4j
-public class ZookeeperDisabledSelector implements EventSelector {
-
- @Override
- public boolean accept(WorkflowContext context) {
- ProcessForm processForm = context.getProcessForm();
- String groupId = processForm.getInlongGroupId();
- if (processForm instanceof GroupResourceProcessForm) {
- GroupResourceProcessForm groupResourceForm = (GroupResourceProcessForm) processForm;
- InlongGroupInfo groupInfo = groupResourceForm.getGroupInfo();
- boolean enable = InlongConstants.DISABLE_ZK.equals(groupInfo.getEnableZookeeper())
- && MQType.forType(groupInfo.getMqType()) != MQType.NONE;
-
- log.info("zookeeper disabled was [{}] for groupId [{}]", enable, groupId);
- return enable;
- } else if (processForm instanceof StreamResourceProcessForm) {
- StreamResourceProcessForm streamResourceForm = (StreamResourceProcessForm) processForm;
- InlongGroupInfo groupInfo = streamResourceForm.getGroupInfo();
- InlongStreamInfo streamInfo = streamResourceForm.getStreamInfo();
- boolean enable = InlongConstants.DISABLE_ZK.equals(groupInfo.getEnableZookeeper())
- && MQType.forType(groupInfo.getMqType()) != MQType.NONE;
- log.info("zookeeper disabled was [{}] for groupId [{}] and streamId [{}] ", enable, groupId,
- streamInfo.getInlongStreamId());
- return enable;
- } else {
- log.info("zk disabled for groupId [{}]", groupId);
- return false;
- }
- }
-}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/ZookeeperEnabledSelector.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/ZookeeperEnabledSelector.java
deleted file mode 100644
index 2c3d21952..000000000
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/ZookeeperEnabledSelector.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.sort;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.consts.InlongConstants;
-import org.apache.inlong.manager.common.enums.MQType;
-import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
-import org.apache.inlong.manager.workflow.WorkflowContext;
-import org.apache.inlong.manager.workflow.event.EventSelector;
-
-/**
- * Event selector for whether ZooKeeper is enabled.
- */
-@Slf4j
-@Deprecated
-public class ZookeeperEnabledSelector implements EventSelector {
-
- @Override
- public boolean accept(WorkflowContext context) {
- ProcessForm processForm = context.getProcessForm();
- String groupId = processForm.getInlongGroupId();
- if (!(processForm instanceof GroupResourceProcessForm)) {
- log.warn("zookeeper enabled was [false] for groupId [{}]", groupId);
- return false;
- }
-
- GroupResourceProcessForm groupResourceForm = (GroupResourceProcessForm) processForm;
- InlongGroupInfo groupInfo = groupResourceForm.getGroupInfo();
- boolean enable = InlongConstants.ENABLE_ZK.equals(groupInfo.getEnableZookeeper())
- && MQType.forType(groupInfo.getMqType()) != MQType.NONE;
- log.info("zookeeper enabled was [{}] for groupId [{}]", enable, groupId);
- return enable;
- }
-
-}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/AbstractSourceOperateListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/AbstractSourceOperateListener.java
index cd62aceec..9ec754ceb 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/AbstractSourceOperateListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/AbstractSourceOperateListener.java
@@ -40,7 +40,7 @@ import org.apache.inlong.manager.service.core.InlongStreamService;
import org.apache.inlong.manager.service.source.StreamSourceService;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
-import org.apache.inlong.manager.workflow.event.task.DataSourceOperateListener;
+import org.apache.inlong.manager.workflow.event.task.SourceOperateListener;
import org.apache.inlong.manager.workflow.event.task.TaskEvent;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@@ -53,7 +53,7 @@ import java.util.concurrent.TimeUnit;
*/
@Slf4j
@Component
-public abstract class AbstractSourceOperateListener implements DataSourceOperateListener {
+public abstract class AbstractSourceOperateListener implements SourceOperateListener {
@Autowired
protected InlongStreamService streamService;
@@ -146,7 +146,7 @@ public abstract class AbstractSourceOperateListener implements DataSourceOperate
return CommonBeanUtils.copyProperties((KafkaSource) streamSource, KafkaSourceRequest::new);
default:
throw new IllegalArgumentException(
- String.format("Unsupported type=%s for DataSourceOperateListener", type));
+ String.format("Unsupported type=%s for SourceOperateListener", type));
}
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/SourceDeleteEventSelector.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/SourceDeleteEventSelector.java
deleted file mode 100644
index 087892ed5..000000000
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/SourceDeleteEventSelector.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.source.listener;
-
-import org.apache.inlong.manager.common.enums.GroupOperateType;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
-import org.apache.inlong.manager.workflow.WorkflowContext;
-import org.apache.inlong.manager.workflow.event.EventSelector;
-
-/**
- * Selector of source delete event.
- */
-public class SourceDeleteEventSelector implements EventSelector {
-
- @Override
- public boolean accept(WorkflowContext context) {
- ProcessForm processForm = context.getProcessForm();
- if (!(processForm instanceof GroupResourceProcessForm)) {
- return false;
- }
- GroupResourceProcessForm groupResourceProcessForm = (GroupResourceProcessForm) processForm;
- return groupResourceProcessForm.getGroupOperateType() == GroupOperateType.DELETE;
- }
-}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/SourceDeleteListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/SourceDeleteListener.java
index 16faeebb1..0277cc9d4 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/SourceDeleteListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/SourceDeleteListener.java
@@ -18,7 +18,11 @@
package org.apache.inlong.manager.service.source.listener;
import lombok.extern.slf4j.Slf4j;
+import org.apache.inlong.manager.common.enums.GroupOperateType;
import org.apache.inlong.manager.common.pojo.source.SourceRequest;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
+import org.apache.inlong.manager.workflow.WorkflowContext;
import org.springframework.stereotype.Component;
/**
@@ -33,6 +37,16 @@ public class SourceDeleteListener extends AbstractSourceOperateListener {
return getClass().getSimpleName();
}
+ @Override
+ public boolean accept(WorkflowContext context) {
+ ProcessForm processForm = context.getProcessForm();
+ if (!(processForm instanceof GroupResourceProcessForm)) {
+ return false;
+ }
+ GroupResourceProcessForm groupResourceForm = (GroupResourceProcessForm) processForm;
+ return groupResourceForm.getGroupOperateType() == GroupOperateType.DELETE;
+ }
+
@Override
public void operateStreamSource(SourceRequest sourceRequest, String operator) {
streamSourceService.delete(sourceRequest.getId(), operator);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/SourceRestartEventSelector.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/SourceRestartEventSelector.java
deleted file mode 100644
index 0d3b2f57b..000000000
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/SourceRestartEventSelector.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.source.listener;
-
-import org.apache.inlong.manager.common.enums.GroupOperateType;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
-import org.apache.inlong.manager.workflow.WorkflowContext;
-import org.apache.inlong.manager.workflow.event.EventSelector;
-
-/**
- * Selector of source restart event.
- */
-public class SourceRestartEventSelector implements EventSelector {
-
- @Override
- public boolean accept(WorkflowContext context) {
- ProcessForm processForm = context.getProcessForm();
- if (!(processForm instanceof GroupResourceProcessForm)) {
- return false;
- }
- GroupResourceProcessForm groupResourceProcessForm = (GroupResourceProcessForm) processForm;
- return groupResourceProcessForm.getGroupOperateType() == GroupOperateType.RESTART;
- }
-}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/SourceRestartListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/SourceRestartListener.java
index 7422c2312..ea5071dfd 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/SourceRestartListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/SourceRestartListener.java
@@ -17,7 +17,11 @@
package org.apache.inlong.manager.service.source.listener;
+import org.apache.inlong.manager.common.enums.GroupOperateType;
import org.apache.inlong.manager.common.pojo.source.SourceRequest;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
+import org.apache.inlong.manager.workflow.WorkflowContext;
import org.springframework.stereotype.Component;
/**
@@ -31,6 +35,16 @@ public class SourceRestartListener extends AbstractSourceOperateListener {
return getClass().getSimpleName();
}
+ @Override
+ public boolean accept(WorkflowContext context) {
+ ProcessForm processForm = context.getProcessForm();
+ if (!(processForm instanceof GroupResourceProcessForm)) {
+ return false;
+ }
+ GroupResourceProcessForm groupResourceForm = (GroupResourceProcessForm) processForm;
+ return groupResourceForm.getGroupOperateType() == GroupOperateType.RESTART;
+ }
+
@Override
public void operateStreamSource(SourceRequest sourceRequest, String operator) {
streamSourceService.restart(sourceRequest.getId(), operator);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/SourceStopEventSelector.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/SourceStopEventSelector.java
deleted file mode 100644
index 9d01cb017..000000000
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/SourceStopEventSelector.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.source.listener;
-
-import org.apache.inlong.manager.common.enums.GroupOperateType;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
-import org.apache.inlong.manager.workflow.WorkflowContext;
-import org.apache.inlong.manager.workflow.event.EventSelector;
-
-/**
- * Selector of source stop event.
- */
-public class SourceStopEventSelector implements EventSelector {
-
- @Override
- public boolean accept(WorkflowContext context) {
- ProcessForm processForm = context.getProcessForm();
- if (!(processForm instanceof GroupResourceProcessForm)) {
- return false;
- }
- GroupResourceProcessForm groupResourceProcessForm = (GroupResourceProcessForm) processForm;
- return groupResourceProcessForm.getGroupOperateType() == GroupOperateType.SUSPEND;
- }
-}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/SourceStopListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/SourceStopListener.java
index 6203fccca..4d985ba10 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/SourceStopListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/SourceStopListener.java
@@ -17,7 +17,11 @@
package org.apache.inlong.manager.service.source.listener;
+import org.apache.inlong.manager.common.enums.GroupOperateType;
import org.apache.inlong.manager.common.pojo.source.SourceRequest;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
+import org.apache.inlong.manager.workflow.WorkflowContext;
import org.springframework.stereotype.Component;
/**
@@ -31,6 +35,16 @@ public class SourceStopListener extends AbstractSourceOperateListener {
return getClass().getSimpleName();
}
+ @Override
+ public boolean accept(WorkflowContext context) {
+ ProcessForm processForm = context.getProcessForm();
+ if (!(processForm instanceof GroupResourceProcessForm)) {
+ return false;
+ }
+ GroupResourceProcessForm groupResourceForm = (GroupResourceProcessForm) processForm;
+ return groupResourceForm.getGroupOperateType() == GroupOperateType.SUSPEND;
+ }
+
@Override
public void operateStreamSource(SourceRequest sourceRequest, String operator) {
streamSourceService.stop(sourceRequest.getId(), operator);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionPassTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionPassTaskListener.java
index c50bafd26..cefa3e7d9 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionPassTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionPassTaskListener.java
@@ -23,8 +23,8 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.common.pojo.consumption.ConsumptionInfo;
-import org.apache.inlong.manager.common.pojo.workflow.form.task.ConsumptionApproveForm;
import org.apache.inlong.manager.common.pojo.workflow.form.process.ApplyConsumptionProcessForm;
+import org.apache.inlong.manager.common.pojo.workflow.form.task.ConsumptionApproveForm;
import org.apache.inlong.manager.service.core.ConsumptionService;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/CreateGroupWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/CreateGroupWorkflowDefinition.java
index 421bca1c0..b582658ca 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/CreateGroupWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/CreateGroupWorkflowDefinition.java
@@ -18,8 +18,8 @@
package org.apache.inlong.manager.service.workflow.group;
import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
import org.apache.inlong.manager.common.enums.ProcessName;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
import org.apache.inlong.manager.service.workflow.WorkflowDefinition;
import org.apache.inlong.manager.service.workflow.group.listener.InitGroupCompleteListener;
import org.apache.inlong.manager.service.workflow.group.listener.InitGroupFailedListener;
@@ -73,32 +73,32 @@ public class CreateGroupWorkflowDefinition implements WorkflowDefinition {
ServiceTask initMQTask = new ServiceTask();
initMQTask.setName("InitMQ");
initMQTask.setDisplayName("Group-InitMQ");
- initMQTask.addServiceTaskType(ServiceTaskType.INIT_MQ);
- initMQTask.addListenerProvider(groupTaskListenerFactory);
+ initMQTask.setServiceTaskType(ServiceTaskType.INIT_MQ);
+ initMQTask.setListenerFactory(groupTaskListenerFactory);
process.addTask(initMQTask);
// Init Sink
ServiceTask initSinkTask = new ServiceTask();
initSinkTask.setName("InitSink");
initSinkTask.setDisplayName("Group-InitSink");
- initSinkTask.addServiceTaskType(ServiceTaskType.INIT_SINK);
- initSinkTask.addListenerProvider(groupTaskListenerFactory);
+ initSinkTask.setServiceTaskType(ServiceTaskType.INIT_SINK);
+ initSinkTask.setListenerFactory(groupTaskListenerFactory);
process.addTask(initSinkTask);
// Init Sort
ServiceTask initSortTask = new ServiceTask();
initSortTask.setName("InitSort");
initSortTask.setDisplayName("Group-InitSort");
- initSortTask.addServiceTaskType(ServiceTaskType.INIT_SORT);
- initSortTask.addListenerProvider(groupTaskListenerFactory);
+ initSortTask.setServiceTaskType(ServiceTaskType.INIT_SORT);
+ initSortTask.setListenerFactory(groupTaskListenerFactory);
process.addTask(initSortTask);
// Init Source
ServiceTask initSourceTask = new ServiceTask();
initSourceTask.setName("InitSource");
initSourceTask.setDisplayName("Group-InitSource");
- initSourceTask.addServiceTaskType(ServiceTaskType.INIT_SOURCE);
- initSourceTask.addListenerProvider(groupTaskListenerFactory);
+ initSourceTask.setServiceTaskType(ServiceTaskType.INIT_SOURCE);
+ initSourceTask.setListenerFactory(groupTaskListenerFactory);
process.addTask(initSourceTask);
// End node
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/DeleteGroupWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/DeleteGroupWorkflowDefinition.java
index 0bb526bde..3c17ecad3 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/DeleteGroupWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/DeleteGroupWorkflowDefinition.java
@@ -73,24 +73,24 @@ public class DeleteGroupWorkflowDefinition implements WorkflowDefinition {
ServiceTask deleteSourceTask = new ServiceTask();
deleteSourceTask.setName("DeleteSource");
deleteSourceTask.setDisplayName("Group-DeleteSource");
- deleteSourceTask.addServiceTaskType(ServiceTaskType.DELETE_SOURCE);
- deleteSourceTask.addListenerProvider(groupTaskListenerFactory);
+ deleteSourceTask.setServiceTaskType(ServiceTaskType.DELETE_SOURCE);
+ deleteSourceTask.setListenerFactory(groupTaskListenerFactory);
process.addTask(deleteSourceTask);
// Delete MQ
ServiceTask deleteMQTask = new ServiceTask();
deleteMQTask.setName("DeleteMQ");
deleteMQTask.setDisplayName("Group-DeleteMQ");
- deleteMQTask.addServiceTaskType(ServiceTaskType.DELETE_MQ);
- deleteMQTask.addListenerProvider(groupTaskListenerFactory);
+ deleteMQTask.setServiceTaskType(ServiceTaskType.DELETE_MQ);
+ deleteMQTask.setListenerFactory(groupTaskListenerFactory);
process.addTask(deleteMQTask);
// Delete Sort
ServiceTask deleteSortTask = new ServiceTask();
deleteSortTask.setName("DeleteSort");
deleteSortTask.setDisplayName("Group-DeleteSort");
- deleteSortTask.addServiceTaskType(ServiceTaskType.DELETE_SORT);
- deleteSortTask.addListenerProvider(groupTaskListenerFactory);
+ deleteSortTask.setServiceTaskType(ServiceTaskType.DELETE_SORT);
+ deleteSortTask.setListenerFactory(groupTaskListenerFactory);
process.addTask(deleteSortTask);
// End node
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/RestartGroupWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/RestartGroupWorkflowDefinition.java
index a8d1afb27..4449b78e5 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/RestartGroupWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/RestartGroupWorkflowDefinition.java
@@ -73,16 +73,16 @@ public class RestartGroupWorkflowDefinition implements WorkflowDefinition {
ServiceTask restartSortTask = new ServiceTask();
restartSortTask.setName("RestartSort");
restartSortTask.setDisplayName("Group-RestartSort");
- restartSortTask.addServiceTaskType(ServiceTaskType.RESTART_SORT);
- restartSortTask.addListenerProvider(taskListenerFactory);
+ restartSortTask.setServiceTaskType(ServiceTaskType.RESTART_SORT);
+ restartSortTask.setListenerFactory(taskListenerFactory);
process.addTask(restartSortTask);
// Restart Source
ServiceTask restartSourceTask = new ServiceTask();
restartSourceTask.setName("RestartSource");
restartSourceTask.setDisplayName("Group-RestartSource");
- restartSourceTask.addServiceTaskType(ServiceTaskType.RESTART_SOURCE);
- restartSourceTask.addListenerProvider(taskListenerFactory);
+ restartSourceTask.setServiceTaskType(ServiceTaskType.RESTART_SOURCE);
+ restartSourceTask.setListenerFactory(taskListenerFactory);
process.addTask(restartSourceTask);
// End node
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/SuspendGroupWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/SuspendGroupWorkflowDefinition.java
index e59819b3f..d9aaf588e 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/SuspendGroupWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/SuspendGroupWorkflowDefinition.java
@@ -73,16 +73,16 @@ public class SuspendGroupWorkflowDefinition implements WorkflowDefinition {
ServiceTask stopSourceTask = new ServiceTask();
stopSourceTask.setName("StopSource");
stopSourceTask.setDisplayName("Group-StopSource");
- stopSourceTask.addServiceTaskType(ServiceTaskType.STOP_SOURCE);
- stopSourceTask.addListenerProvider(groupTaskListenerFactory);
+ stopSourceTask.setServiceTaskType(ServiceTaskType.STOP_SOURCE);
+ stopSourceTask.setListenerFactory(groupTaskListenerFactory);
process.addTask(stopSourceTask);
// Stop Sort
ServiceTask stopSortTask = new ServiceTask();
stopSortTask.setName("StopSort");
stopSortTask.setDisplayName("Group-StopSort");
- stopSortTask.addServiceTaskType(ServiceTaskType.STOP_SORT);
- stopSortTask.addListenerProvider(groupTaskListenerFactory);
+ stopSortTask.setServiceTaskType(ServiceTaskType.STOP_SORT);
+ stopSortTask.setListenerFactory(groupTaskListenerFactory);
process.addTask(stopSortTask);
// End node
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/InitGroupCompleteListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/InitGroupCompleteListener.java
index aed5385d9..eb8179e67 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/InitGroupCompleteListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/InitGroupCompleteListener.java
@@ -24,7 +24,10 @@ import org.apache.inlong.manager.common.enums.SourceStatus;
import org.apache.inlong.manager.common.enums.StreamStatus;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.common.pojo.group.InlongGroupRequest;
import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
+import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
+import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
import org.apache.inlong.manager.service.core.InlongStreamService;
import org.apache.inlong.manager.service.group.InlongGroupService;
import org.apache.inlong.manager.service.source.StreamSourceService;
@@ -48,6 +51,8 @@ public class InitGroupCompleteListener implements ProcessEventListener {
private InlongStreamService streamService;
@Autowired
private StreamSourceService sourceService;
+ @Autowired
+ private InlongGroupEntityMapper groupMapper;
@Override
public ProcessEvent event() {
@@ -70,7 +75,10 @@ public class InitGroupCompleteListener implements ProcessEventListener {
InlongGroupInfo groupInfo = form.getGroupInfo();
String operator = context.getOperator();
groupService.updateStatus(groupId, GroupStatus.CONFIG_SUCCESSFUL.getCode(), operator);
- groupService.update(groupInfo.genRequest(), operator);
+ InlongGroupEntity existGroup = groupMapper.selectByGroupId(groupId);
+ InlongGroupRequest updateGroupRequest = groupInfo.genRequest();
+ updateGroupRequest.setVersion(existGroup.getVersion());
+ groupService.update(updateGroupRequest, operator);
// update status of other related configs
streamService.updateStatus(groupId, null, StreamStatus.CONFIG_SUCCESSFUL.getCode(), operator);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/listener/GroupTaskListenerFactory.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/listener/GroupTaskListenerFactory.java
index 650bef608..552f5e1e9 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/listener/GroupTaskListenerFactory.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/listener/GroupTaskListenerFactory.java
@@ -19,31 +19,23 @@ package org.apache.inlong.manager.service.workflow.listener;
import com.google.common.collect.Lists;
import lombok.Data;
-import org.apache.commons.collections.MapUtils;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.inlong.manager.service.mq.CreatePulsarGroupTaskListener;
import org.apache.inlong.manager.service.mq.CreatePulsarResourceTaskListener;
import org.apache.inlong.manager.service.mq.CreateTubeGroupTaskListener;
import org.apache.inlong.manager.service.mq.CreateTubeTopicTaskListener;
import org.apache.inlong.manager.service.mq.DeletePulsarResourceTaskListener;
-import org.apache.inlong.manager.service.mq.PulsarResourceCreateSelector;
-import org.apache.inlong.manager.service.mq.PulsarResourceDeleteSelector;
-import org.apache.inlong.manager.service.mq.TubeEventSelector;
import org.apache.inlong.manager.service.resource.SinkResourceListener;
import org.apache.inlong.manager.service.sort.SortConfigListener;
-import org.apache.inlong.manager.service.sort.ZookeeperDisabledSelector;
-import org.apache.inlong.manager.service.source.listener.SourceDeleteEventSelector;
import org.apache.inlong.manager.service.source.listener.SourceDeleteListener;
-import org.apache.inlong.manager.service.source.listener.SourceRestartEventSelector;
import org.apache.inlong.manager.service.source.listener.SourceRestartListener;
-import org.apache.inlong.manager.service.source.listener.SourceStopEventSelector;
import org.apache.inlong.manager.service.source.listener.SourceStopListener;
import org.apache.inlong.manager.workflow.WorkflowContext;
-import org.apache.inlong.manager.workflow.definition.ServiceTaskListenerProvider;
import org.apache.inlong.manager.workflow.definition.ServiceTaskType;
-import org.apache.inlong.manager.workflow.event.EventSelector;
-import org.apache.inlong.manager.workflow.event.task.DataSourceOperateListener;
+import org.apache.inlong.manager.workflow.definition.TaskListenerFactory;
import org.apache.inlong.manager.workflow.event.task.QueueOperateListener;
import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
+import org.apache.inlong.manager.workflow.event.task.SourceOperateListener;
import org.apache.inlong.manager.workflow.event.task.TaskEventListener;
import org.apache.inlong.manager.workflow.plugin.Plugin;
import org.apache.inlong.manager.workflow.plugin.PluginBinder;
@@ -54,22 +46,19 @@ import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.LinkedHashMap;
+import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
/**
- * Service task listener factory.
+ * The TaskEventListener factory for InlongGroup.
*/
@Data
@Component
-public class GroupTaskListenerFactory implements PluginBinder, ServiceTaskListenerProvider {
+public class GroupTaskListenerFactory implements PluginBinder, TaskListenerFactory {
- private Map<DataSourceOperateListener, EventSelector> sourceOperateListeners;
-
- private Map<QueueOperateListener, EventSelector> queueOperateListeners;
-
- private Map<SortOperateListener, EventSelector> sortOperateListeners;
+ private List<SourceOperateListener> sourceOperateListeners;
+ private List<QueueOperateListener> queueOperateListeners;
+ private List<SortOperateListener> sortOperateListeners;
@Autowired
private SourceStopListener sourceStopListener;
@@ -77,7 +66,6 @@ public class GroupTaskListenerFactory implements PluginBinder, ServiceTaskListen
private SourceRestartListener sourceRestartListener;
@Autowired
private SourceDeleteListener sourceDeleteListener;
-
@Autowired
private CreateTubeTopicTaskListener createTubeTopicTaskListener;
@Autowired
@@ -88,7 +76,6 @@ public class GroupTaskListenerFactory implements PluginBinder, ServiceTaskListen
private CreatePulsarGroupTaskListener createPulsarGroupTaskListener;
@Autowired
private DeletePulsarResourceTaskListener deletePulsarResourceTaskListener;
-
@Autowired
private SinkResourceListener sinkResourceListener;
@Autowired
@@ -96,29 +83,43 @@ public class GroupTaskListenerFactory implements PluginBinder, ServiceTaskListen
@PostConstruct
public void init() {
- sourceOperateListeners = new LinkedHashMap<>();
- sourceOperateListeners.put(sourceStopListener, new SourceStopEventSelector());
- sourceOperateListeners.put(sourceDeleteListener, new SourceDeleteEventSelector());
- sourceOperateListeners.put(sourceRestartListener, new SourceRestartEventSelector());
- queueOperateListeners = new LinkedHashMap<>();
- queueOperateListeners.put(createTubeTopicTaskListener, new TubeEventSelector());
- queueOperateListeners.put(createTubeGroupTaskListener, new TubeEventSelector());
- queueOperateListeners.put(createPulsarResourceTaskListener, new PulsarResourceCreateSelector());
- queueOperateListeners.put(createPulsarGroupTaskListener, new PulsarResourceCreateSelector());
- queueOperateListeners.put(deletePulsarResourceTaskListener, new PulsarResourceDeleteSelector());
- sortOperateListeners = new LinkedHashMap<>();
- sortOperateListeners.put(sortConfigListener, new ZookeeperDisabledSelector());
+ sourceOperateListeners = new LinkedList<>();
+ sourceOperateListeners.add(sourceStopListener);
+ sourceOperateListeners.add(sourceDeleteListener);
+ sourceOperateListeners.add(sourceRestartListener);
+ queueOperateListeners = new LinkedList<>();
+ queueOperateListeners.add(createTubeTopicTaskListener);
+ queueOperateListeners.add(createTubeGroupTaskListener);
+ queueOperateListeners.add(createPulsarResourceTaskListener);
+ queueOperateListeners.add(createPulsarGroupTaskListener);
+ queueOperateListeners.add(deletePulsarResourceTaskListener);
+ sortOperateListeners = new LinkedList<>();
+ sortOperateListeners.add(sortConfigListener);
}
- public void clearListeners() {
- sourceOperateListeners = new LinkedHashMap<>();
- queueOperateListeners = new LinkedHashMap<>();
- sortOperateListeners = new LinkedHashMap<>();
+ @Override
+ public void acceptPlugin(Plugin plugin) {
+ if (!(plugin instanceof ProcessPlugin)) {
+ return;
+ }
+ ProcessPlugin processPlugin = (ProcessPlugin) plugin;
+ List<SourceOperateListener> pluginSourceOperateListeners = processPlugin.createSourceOperateListeners();
+ if (CollectionUtils.isNotEmpty(pluginSourceOperateListeners)) {
+ sourceOperateListeners.addAll(processPlugin.createSourceOperateListeners());
+ }
+ List<QueueOperateListener> pluginQueueOperateListeners = processPlugin.createQueueOperateListeners();
+ if (CollectionUtils.isNotEmpty(pluginQueueOperateListeners)) {
+ queueOperateListeners.addAll(pluginQueueOperateListeners);
+ }
+ List<SortOperateListener> pluginSortOperateListeners = processPlugin.createSortOperateListeners();
+ if (CollectionUtils.isNotEmpty(pluginSortOperateListeners)) {
+ sortOperateListeners.addAll(pluginSortOperateListeners);
+ }
}
@Override
- public List<TaskEventListener> get(WorkflowContext workflowContext, ServiceTaskType serviceTaskType) {
- switch (serviceTaskType) {
+ public List<? extends TaskEventListener> get(WorkflowContext workflowContext, ServiceTaskType taskType) {
+ switch (taskType) {
case INIT_MQ:
case DELETE_MQ:
List<QueueOperateListener> queueOperateListeners = getQueueOperateListener(workflowContext);
@@ -133,24 +134,32 @@ public class GroupTaskListenerFactory implements PluginBinder, ServiceTaskListen
case STOP_SOURCE:
case RESTART_SOURCE:
case DELETE_SOURCE:
- List<DataSourceOperateListener> sourceOperateListeners = getSourceOperateListener(workflowContext);
+ List<SourceOperateListener> sourceOperateListeners = getSourceOperateListener(workflowContext);
return Lists.newArrayList(sourceOperateListeners);
case INIT_SINK:
return Collections.singletonList(sinkResourceListener);
default:
- throw new IllegalArgumentException(String.format("UnSupport ServiceTaskType %s", serviceTaskType));
+ throw new IllegalArgumentException(String.format("Unsupported ServiceTaskType %s", taskType));
}
}
/**
- * Get data source operate listener list.
+ * Clear the list of listeners.
+ */
+ public void clearListeners() {
+ sourceOperateListeners = new LinkedList<>();
+ queueOperateListeners = new LinkedList<>();
+ sortOperateListeners = new LinkedList<>();
+ }
+
+ /**
+ * Get stream source operate listener list.
*/
- public List<DataSourceOperateListener> getSourceOperateListener(WorkflowContext context) {
- List<DataSourceOperateListener> listeners = new ArrayList<>();
- for (Map.Entry<DataSourceOperateListener, EventSelector> entry : sourceOperateListeners.entrySet()) {
- EventSelector selector = entry.getValue();
- if (selector != null && selector.accept(context)) {
- listeners.add(entry.getKey());
+ public List<SourceOperateListener> getSourceOperateListener(WorkflowContext context) {
+ List<SourceOperateListener> listeners = new ArrayList<>();
+ for (SourceOperateListener listener : sourceOperateListeners) {
+ if (listener != null && listener.accept(context)) {
+ listeners.add(listener);
}
}
return listeners;
@@ -161,10 +170,9 @@ public class GroupTaskListenerFactory implements PluginBinder, ServiceTaskListen
*/
public List<QueueOperateListener> getQueueOperateListener(WorkflowContext context) {
List<QueueOperateListener> listeners = new ArrayList<>();
- for (Map.Entry<QueueOperateListener, EventSelector> entry : queueOperateListeners.entrySet()) {
- EventSelector selector = entry.getValue();
- if (selector != null && selector.accept(context)) {
- listeners.add(entry.getKey());
+ for (QueueOperateListener listener : queueOperateListeners) {
+ if (listener != null && listener.accept(context)) {
+ listeners.add(listener);
}
}
return listeners;
@@ -175,36 +183,12 @@ public class GroupTaskListenerFactory implements PluginBinder, ServiceTaskListen
*/
public List<SortOperateListener> getSortOperateListener(WorkflowContext context) {
List<SortOperateListener> listeners = new ArrayList<>();
- for (Map.Entry<SortOperateListener, EventSelector> entry : sortOperateListeners.entrySet()) {
- EventSelector selector = entry.getValue();
- if (selector != null && selector.accept(context)) {
- listeners.add(entry.getKey());
+ for (SortOperateListener listener : sortOperateListeners) {
+ if (listener != null && listener.accept(context)) {
+ listeners.add(listener);
}
}
return listeners;
}
- @Override
- public void acceptPlugin(Plugin plugin) {
- if (!(plugin instanceof ProcessPlugin)) {
- return;
- }
- ProcessPlugin processPlugin = (ProcessPlugin) plugin;
- Map<DataSourceOperateListener, EventSelector> pluginDsOperateListeners =
- processPlugin.createSourceOperateListeners();
- if (MapUtils.isNotEmpty(pluginDsOperateListeners)) {
- sourceOperateListeners.putAll(processPlugin.createSourceOperateListeners());
- }
- Map<QueueOperateListener, EventSelector> pluginQueueOperateListeners =
- processPlugin.createQueueOperateListeners();
- if (MapUtils.isNotEmpty(pluginQueueOperateListeners)) {
- queueOperateListeners.putAll(pluginQueueOperateListeners);
- }
- Map<SortOperateListener, EventSelector> pluginSortOperateListeners =
- processPlugin.createSortOperateListeners();
- if (MapUtils.isNotEmpty(pluginSortOperateListeners)) {
- sortOperateListeners.putAll(pluginSortOperateListeners);
- }
- }
-
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/listener/StreamTaskListenerFactory.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/listener/StreamTaskListenerFactory.java
index f556c9b34..b894817eb 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/listener/StreamTaskListenerFactory.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/listener/StreamTaskListenerFactory.java
@@ -18,26 +18,20 @@
package org.apache.inlong.manager.service.workflow.listener;
import com.google.common.collect.Lists;
-import lombok.Data;
-import org.apache.commons.collections.MapUtils;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.StreamResourceProcessForm;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.inlong.manager.service.mq.CreatePulsarSubscriptionTaskListener;
import org.apache.inlong.manager.service.mq.CreatePulsarTopicTaskListener;
import org.apache.inlong.manager.service.mq.DeletePulsarTopicTaskListener;
-import org.apache.inlong.manager.service.mq.PulsarTopicCreateSelector;
-import org.apache.inlong.manager.service.mq.PulsarTopicDeleteSelector;
import org.apache.inlong.manager.service.resource.StreamSinkResourceListener;
import org.apache.inlong.manager.service.sort.StreamSortConfigListener;
-import org.apache.inlong.manager.service.sort.ZookeeperEnabledSelector;
import org.apache.inlong.manager.workflow.WorkflowContext;
-import org.apache.inlong.manager.workflow.definition.ServiceTaskListenerProvider;
import org.apache.inlong.manager.workflow.definition.ServiceTaskType;
-import org.apache.inlong.manager.workflow.event.EventSelector;
-import org.apache.inlong.manager.workflow.event.task.DataSourceOperateListener;
+import org.apache.inlong.manager.workflow.definition.TaskListenerFactory;
import org.apache.inlong.manager.workflow.event.task.QueueOperateListener;
import org.apache.inlong.manager.workflow.event.task.SinkOperateListener;
import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
+import org.apache.inlong.manager.workflow.event.task.SourceOperateListener;
+import org.apache.inlong.manager.workflow.event.task.TaskEventListener;
import org.apache.inlong.manager.workflow.plugin.Plugin;
import org.apache.inlong.manager.workflow.plugin.PluginBinder;
import org.apache.inlong.manager.workflow.plugin.ProcessPlugin;
@@ -46,21 +40,19 @@ import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.ArrayList;
-import java.util.LinkedHashMap;
+import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
-@Data
+/**
+ * The TaskEventListener factory for InlongStream.
+ */
@Component
-public class StreamTaskListenerFactory implements PluginBinder, ServiceTaskListenerProvider {
-
- private Map<DataSourceOperateListener, EventSelector> sourceOperateListeners;
-
- private Map<QueueOperateListener, EventSelector> queueOperateListeners;
-
- private Map<SortOperateListener, EventSelector> sortOperateListeners;
+public class StreamTaskListenerFactory implements PluginBinder, TaskListenerFactory {
- private Map<SinkOperateListener, EventSelector> sinkOperateListeners;
+ private List<SourceOperateListener> sourceOperateListeners;
+ private List<QueueOperateListener> queueOperateListeners;
+ private List<SortOperateListener> sortOperateListeners;
+ private List<SinkOperateListener> sinkOperateListeners;
@Autowired
private CreatePulsarTopicTaskListener createPulsarTopicTaskListener;
@@ -75,23 +67,44 @@ public class StreamTaskListenerFactory implements PluginBinder, ServiceTaskListe
@PostConstruct
public void init() {
- sourceOperateListeners = new LinkedHashMap<>();
- queueOperateListeners = new LinkedHashMap<>();
- queueOperateListeners.put(createPulsarTopicTaskListener, new PulsarTopicCreateSelector());
- queueOperateListeners.put(createPulsarSubscriptionTaskListener, new PulsarTopicCreateSelector());
- queueOperateListeners.put(deletePulsarTopicTaskListener, new PulsarTopicDeleteSelector());
- sortOperateListeners = new LinkedHashMap<>();
- sortOperateListeners.put(streamSortConfigListener, new ZookeeperEnabledSelector());
- sinkOperateListeners = new LinkedHashMap<>();
- sinkOperateListeners.put(sinkResourceListener, context -> {
- ProcessForm processForm = context.getProcessForm();
- return processForm instanceof StreamResourceProcessForm;
- });
+ sourceOperateListeners = new LinkedList<>();
+ queueOperateListeners = new LinkedList<>();
+ queueOperateListeners.add(createPulsarTopicTaskListener);
+ queueOperateListeners.add(createPulsarSubscriptionTaskListener);
+ queueOperateListeners.add(deletePulsarTopicTaskListener);
+ sortOperateListeners = new LinkedList<>();
+ sortOperateListeners.add(streamSortConfigListener);
+ sinkOperateListeners = new LinkedList<>();
+ sinkOperateListeners.add(sinkResourceListener);
+ }
+
+ @Override
+ public void acceptPlugin(Plugin plugin) {
+ if (!(plugin instanceof ProcessPlugin)) {
+ return;
+ }
+ ProcessPlugin processPlugin = (ProcessPlugin) plugin;
+ List<SourceOperateListener> pluginSourceOperateListeners = processPlugin.createSourceOperateListeners();
+ if (CollectionUtils.isNotEmpty(pluginSourceOperateListeners)) {
+ sourceOperateListeners.addAll(processPlugin.createSourceOperateListeners());
+ }
+ List<QueueOperateListener> pluginQueueOperateListeners = processPlugin.createQueueOperateListeners();
+ if (CollectionUtils.isNotEmpty(pluginQueueOperateListeners)) {
+ queueOperateListeners.addAll(pluginQueueOperateListeners);
+ }
+ List<SortOperateListener> pluginSortOperateListeners = processPlugin.createSortOperateListeners();
+ if (CollectionUtils.isNotEmpty(pluginSortOperateListeners)) {
+ sortOperateListeners.addAll(pluginSortOperateListeners);
+ }
+ List<SinkOperateListener> pluginSinkOperateListeners = processPlugin.createSinkOperateListeners();
+ if (CollectionUtils.isNotEmpty(pluginSinkOperateListeners)) {
+ sinkOperateListeners.addAll(pluginSinkOperateListeners);
+ }
}
@Override
- public Iterable get(WorkflowContext workflowContext, ServiceTaskType serviceTaskType) {
- switch (serviceTaskType) {
+ public List<? extends TaskEventListener> get(WorkflowContext workflowContext, ServiceTaskType taskType) {
+ switch (taskType) {
case INIT_MQ:
case DELETE_MQ:
List<QueueOperateListener> queueOperateListeners = getQueueOperateListener(workflowContext);
@@ -106,25 +119,34 @@ public class StreamTaskListenerFactory implements PluginBinder, ServiceTaskListe
case STOP_SOURCE:
case RESTART_SOURCE:
case DELETE_SOURCE:
- List<DataSourceOperateListener> sourceOperateListeners = getSourceOperateListener(workflowContext);
+ List<SourceOperateListener> sourceOperateListeners = getSourceOperateListener(workflowContext);
return Lists.newArrayList(sourceOperateListeners);
case INIT_SINK:
List<SinkOperateListener> sinkOperateListeners = getSinkOperateListener(workflowContext);
return Lists.newArrayList(sinkOperateListeners);
default:
- throw new IllegalArgumentException(String.format("UnSupport ServiceTaskType %s", serviceTaskType));
+ throw new IllegalArgumentException(String.format("Unsupported ServiceTaskType %s", taskType));
}
}
/**
- * Get data source operate listener list.
+ * Clear the list of listeners.
*/
- public List<DataSourceOperateListener> getSourceOperateListener(WorkflowContext context) {
- List<DataSourceOperateListener> listeners = new ArrayList<>();
- for (Map.Entry<DataSourceOperateListener, EventSelector> entry : sourceOperateListeners.entrySet()) {
- EventSelector selector = entry.getValue();
- if (selector != null && selector.accept(context)) {
- listeners.add(entry.getKey());
+ public void clearListeners() {
+ sourceOperateListeners = new LinkedList<>();
+ queueOperateListeners = new LinkedList<>();
+ sortOperateListeners = new LinkedList<>();
+ sinkOperateListeners = new LinkedList<>();
+ }
+
+ /**
+ * Get stream source operate listener list.
+ */
+ public List<SourceOperateListener> getSourceOperateListener(WorkflowContext context) {
+ List<SourceOperateListener> listeners = new ArrayList<>();
+ for (SourceOperateListener listener : sourceOperateListeners) {
+ if (listener != null && listener.accept(context)) {
+ listeners.add(listener);
}
}
return listeners;
@@ -135,10 +157,9 @@ public class StreamTaskListenerFactory implements PluginBinder, ServiceTaskListe
*/
public List<QueueOperateListener> getQueueOperateListener(WorkflowContext context) {
List<QueueOperateListener> listeners = new ArrayList<>();
- for (Map.Entry<QueueOperateListener, EventSelector> entry : queueOperateListeners.entrySet()) {
- EventSelector selector = entry.getValue();
- if (selector != null && selector.accept(context)) {
- listeners.add(entry.getKey());
+ for (QueueOperateListener listener : queueOperateListeners) {
+ if (listener != null && listener.accept(context)) {
+ listeners.add(listener);
}
}
return listeners;
@@ -149,10 +170,9 @@ public class StreamTaskListenerFactory implements PluginBinder, ServiceTaskListe
*/
public List<SortOperateListener> getSortOperateListener(WorkflowContext context) {
List<SortOperateListener> listeners = new ArrayList<>();
- for (Map.Entry<SortOperateListener, EventSelector> entry : sortOperateListeners.entrySet()) {
- EventSelector selector = entry.getValue();
- if (selector != null && selector.accept(context)) {
- listeners.add(entry.getKey());
+ for (SortOperateListener listener : sortOperateListeners) {
+ if (listener != null && listener.accept(context)) {
+ listeners.add(listener);
}
}
return listeners;
@@ -161,41 +181,14 @@ public class StreamTaskListenerFactory implements PluginBinder, ServiceTaskListe
/**
* Get sink operate listener list.
*/
- public List<SinkOperateListener> getSinkOperateListener(WorkflowContext context) {
+ private List<SinkOperateListener> getSinkOperateListener(WorkflowContext context) {
List<SinkOperateListener> listeners = new ArrayList<>();
- for (Map.Entry<SinkOperateListener, EventSelector> entry : sinkOperateListeners.entrySet()) {
- EventSelector selector = entry.getValue();
- if (selector != null && selector.accept(context)) {
- listeners.add(entry.getKey());
+ for (SinkOperateListener listener : sinkOperateListeners) {
+ if (listener != null && listener.accept(context)) {
+ listeners.add(listener);
}
}
return listeners;
}
- @Override
- public void acceptPlugin(Plugin plugin) {
- if (!(plugin instanceof ProcessPlugin)) {
- return;
- }
- ProcessPlugin processPlugin = (ProcessPlugin) plugin;
- Map<DataSourceOperateListener, EventSelector> pluginDsOperateListeners =
- processPlugin.createSourceOperateListeners();
- if (MapUtils.isNotEmpty(pluginDsOperateListeners)) {
- sourceOperateListeners.putAll(processPlugin.createSourceOperateListeners());
- }
- Map<QueueOperateListener, EventSelector> pluginQueueOperateListeners =
- processPlugin.createQueueOperateListeners();
- if (MapUtils.isNotEmpty(pluginQueueOperateListeners)) {
- queueOperateListeners.putAll(pluginQueueOperateListeners);
- }
- Map<SortOperateListener, EventSelector> pluginSortOperateListeners =
- processPlugin.createSortOperateListeners();
- if (MapUtils.isNotEmpty(pluginSortOperateListeners)) {
- sortOperateListeners.putAll(pluginSortOperateListeners);
- }
- Map<SinkOperateListener, EventSelector> pluginSinkOperateListeners = processPlugin.createSinkOperateListeners();
- if (MapUtils.isNotEmpty(pluginSinkOperateListeners)) {
- sinkOperateListeners.putAll(pluginSinkOperateListeners);
- }
- }
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java
index 4afeb82e1..0183448ca 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java
@@ -73,32 +73,32 @@ public class CreateStreamWorkflowDefinition implements WorkflowDefinition {
ServiceTask initMQTask = new ServiceTask();
initMQTask.setName("InitMQ");
initMQTask.setDisplayName("Stream-InitMQ");
- initMQTask.addServiceTaskType(ServiceTaskType.INIT_MQ);
- initMQTask.addListenerProvider(streamTaskListenerFactory);
+ initMQTask.setServiceTaskType(ServiceTaskType.INIT_MQ);
+ initMQTask.setListenerFactory(streamTaskListenerFactory);
process.addTask(initMQTask);
// Init Sink
ServiceTask initSinkTask = new ServiceTask();
initSinkTask.setName("InitSink");
initSinkTask.setDisplayName("Stream-InitSink");
- initSinkTask.addServiceTaskType(ServiceTaskType.INIT_SINK);
- initSinkTask.addListenerProvider(streamTaskListenerFactory);
+ initSinkTask.setServiceTaskType(ServiceTaskType.INIT_SINK);
+ initSinkTask.setListenerFactory(streamTaskListenerFactory);
process.addTask(initSinkTask);
// Init Sort
ServiceTask initSortTask = new ServiceTask();
initSortTask.setName("InitSort");
initSortTask.setDisplayName("Stream-InitSort");
- initSortTask.addServiceTaskType(ServiceTaskType.INIT_SORT);
- initSortTask.addListenerProvider(streamTaskListenerFactory);
+ initSortTask.setServiceTaskType(ServiceTaskType.INIT_SORT);
+ initSortTask.setListenerFactory(streamTaskListenerFactory);
process.addTask(initSortTask);
// Init Source
ServiceTask initSourceTask = new ServiceTask();
initSourceTask.setName("InitSource");
initSourceTask.setDisplayName("Stream-InitSource");
- initSourceTask.addServiceTaskType(ServiceTaskType.INIT_SOURCE);
- initSourceTask.addListenerProvider(streamTaskListenerFactory);
+ initSourceTask.setServiceTaskType(ServiceTaskType.INIT_SOURCE);
+ initSourceTask.setListenerFactory(streamTaskListenerFactory);
process.addTask(initSourceTask);
// End node
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/DeleteStreamWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/DeleteStreamWorkflowDefinition.java
index b5919d907..886f35138 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/DeleteStreamWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/DeleteStreamWorkflowDefinition.java
@@ -73,24 +73,24 @@ public class DeleteStreamWorkflowDefinition implements WorkflowDefinition {
ServiceTask deleteDataSourceTask = new ServiceTask();
deleteDataSourceTask.setName("DeleteSource");
deleteDataSourceTask.setDisplayName("Stream-DeleteSource");
- deleteDataSourceTask.addServiceTaskType(ServiceTaskType.DELETE_SOURCE);
- deleteDataSourceTask.addListenerProvider(streamTaskListenerFactory);
+ deleteDataSourceTask.setServiceTaskType(ServiceTaskType.DELETE_SOURCE);
+ deleteDataSourceTask.setListenerFactory(streamTaskListenerFactory);
process.addTask(deleteDataSourceTask);
// Delete MQ
ServiceTask deleteMQTask = new ServiceTask();
deleteMQTask.setName("DeleteMQ");
deleteMQTask.setDisplayName("Stream-DeleteMQ");
- deleteMQTask.addServiceTaskType(ServiceTaskType.DELETE_MQ);
- deleteMQTask.addListenerProvider(streamTaskListenerFactory);
+ deleteMQTask.setServiceTaskType(ServiceTaskType.DELETE_MQ);
+ deleteMQTask.setListenerFactory(streamTaskListenerFactory);
process.addTask(deleteMQTask);
// Delete Sort
ServiceTask deleteSortTask = new ServiceTask();
deleteSortTask.setName("DeleteSort");
deleteSortTask.setDisplayName("Stream-DeleteSort");
- deleteSortTask.addServiceTaskType(ServiceTaskType.DELETE_SORT);
- deleteSortTask.addListenerProvider(streamTaskListenerFactory);
+ deleteSortTask.setServiceTaskType(ServiceTaskType.DELETE_SORT);
+ deleteSortTask.setListenerFactory(streamTaskListenerFactory);
process.addTask(deleteSortTask);
// End node
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/RestartStreamWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/RestartStreamWorkflowDefinition.java
index 8cd67a943..ca6f8f24b 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/RestartStreamWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/RestartStreamWorkflowDefinition.java
@@ -73,16 +73,16 @@ public class RestartStreamWorkflowDefinition implements WorkflowDefinition {
ServiceTask restartSortTask = new ServiceTask();
restartSortTask.setName("RestartSort");
restartSortTask.setDisplayName("Stream-RestartSort");
- restartSortTask.addServiceTaskType(ServiceTaskType.RESTART_SORT);
- restartSortTask.addListenerProvider(streamTaskListenerFactory);
+ restartSortTask.setServiceTaskType(ServiceTaskType.RESTART_SORT);
+ restartSortTask.setListenerFactory(streamTaskListenerFactory);
process.addTask(restartSortTask);
// Restart Source
ServiceTask restartDataSourceTask = new ServiceTask();
restartDataSourceTask.setName("RestartSource");
restartDataSourceTask.setDisplayName("Stream-RestartSource");
- restartDataSourceTask.addServiceTaskType(ServiceTaskType.RESTART_SOURCE);
- restartDataSourceTask.addListenerProvider(streamTaskListenerFactory);
+ restartDataSourceTask.setServiceTaskType(ServiceTaskType.RESTART_SOURCE);
+ restartDataSourceTask.setListenerFactory(streamTaskListenerFactory);
process.addTask(restartDataSourceTask);
// End node
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/SuspendStreamWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/SuspendStreamWorkflowDefinition.java
index 4a51b57ec..5b0e9992d 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/SuspendStreamWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/SuspendStreamWorkflowDefinition.java
@@ -73,16 +73,16 @@ public class SuspendStreamWorkflowDefinition implements WorkflowDefinition {
ServiceTask stopDataSourceTask = new ServiceTask();
stopDataSourceTask.setName("StopSource");
stopDataSourceTask.setDisplayName("Stream-StopSource");
- stopDataSourceTask.addServiceTaskType(ServiceTaskType.STOP_SOURCE);
- stopDataSourceTask.addListenerProvider(streamTaskListenerFactory);
+ stopDataSourceTask.setServiceTaskType(ServiceTaskType.STOP_SOURCE);
+ stopDataSourceTask.setListenerFactory(streamTaskListenerFactory);
process.addTask(stopDataSourceTask);
// Stop Sort
ServiceTask stopSortTask = new ServiceTask();
stopSortTask.setName("StopSort");
stopSortTask.setDisplayName("Stream-StopSort");
- stopSortTask.addServiceTaskType(ServiceTaskType.STOP_SORT);
- stopSortTask.addListenerProvider(streamTaskListenerFactory);
+ stopSortTask.setServiceTaskType(ServiceTaskType.STOP_SORT);
+ stopSortTask.setListenerFactory(streamTaskListenerFactory);
process.addTask(stopSortTask);
// End node
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/plugin/PluginClassLoaderTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/plugin/PluginClassLoaderTest.java
index e65b18471..a41bcbe4d 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/plugin/PluginClassLoaderTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/plugin/PluginClassLoaderTest.java
@@ -25,8 +25,8 @@ import org.apache.inlong.manager.workflow.plugin.ProcessPlugin;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
-import java.lang.reflect.InvocationTargetException;
import java.util.Map;
+import java.util.Objects;
/**
* Test class for load plugin.
@@ -36,7 +36,7 @@ public class PluginClassLoaderTest {
@Test
public void testLoadPlugin() {
- String path = this.getClass().getClassLoader().getResource("").getPath();
+ String path = Objects.requireNonNull(this.getClass().getClassLoader().getResource("")).getPath();
PluginClassLoader pluginClassLoader = PluginClassLoader.getFromPluginUrl(path + "plugins",
Thread.currentThread().getContextClassLoader());
Map<String, PluginDefinition> pluginDefinitionMap = pluginClassLoader.getPluginDefinitions();
@@ -49,11 +49,7 @@ public class PluginClassLoaderTest {
Class cls = pluginClassLoader.loadClass(pluginClass);
Plugin plugin = (Plugin) cls.getDeclaredConstructor().newInstance();
Assertions.assertTrue(plugin instanceof ProcessPlugin);
- } catch (ClassNotFoundException
- | NoSuchMethodException
- | InstantiationException
- | IllegalAccessException
- | InvocationTargetException e) {
+ } catch (Exception e) {
Assertions.assertTrue(e instanceof ClassNotFoundException);
Assertions.fail();
}
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/mocks/MockDeleteSortListener.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/mocks/MockDeleteSortListener.java
index db5128672..5836accd7 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/mocks/MockDeleteSortListener.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/mocks/MockDeleteSortListener.java
@@ -17,6 +17,9 @@
package org.apache.inlong.manager.service.mocks;
+import org.apache.inlong.manager.common.enums.GroupOperateType;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
@@ -32,6 +35,16 @@ public class MockDeleteSortListener implements SortOperateListener {
return TaskEvent.COMPLETE;
}
+ @Override
+ public boolean accept(WorkflowContext context) {
+ ProcessForm processForm = context.getProcessForm();
+ if (!(processForm instanceof GroupResourceProcessForm)) {
+ return false;
+ }
+ GroupResourceProcessForm form = (GroupResourceProcessForm) processForm;
+ return form.getGroupOperateType() == GroupOperateType.DELETE;
+ }
+
@Override
public ListenerResult listen(WorkflowContext context) {
return ListenerResult.success("Mock delete sort success");
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/mocks/MockDeleteSourceListener.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/mocks/MockDeleteSourceListener.java
index b4bd8f2f7..60d3e2b07 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/mocks/MockDeleteSourceListener.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/mocks/MockDeleteSourceListener.java
@@ -17,21 +17,34 @@
package org.apache.inlong.manager.service.mocks;
+import org.apache.inlong.manager.common.enums.GroupOperateType;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
-import org.apache.inlong.manager.workflow.event.task.DataSourceOperateListener;
+import org.apache.inlong.manager.workflow.event.task.SourceOperateListener;
import org.apache.inlong.manager.workflow.event.task.TaskEvent;
/**
* Test class for listen delete source event.
*/
-public class MockDeleteSourceListener implements DataSourceOperateListener {
+public class MockDeleteSourceListener implements SourceOperateListener {
@Override
public TaskEvent event() {
return TaskEvent.COMPLETE;
}
+ @Override
+ public boolean accept(WorkflowContext context) {
+ ProcessForm processForm = context.getProcessForm();
+ if (!(processForm instanceof GroupResourceProcessForm)) {
+ return false;
+ }
+ GroupResourceProcessForm form = (GroupResourceProcessForm) processForm;
+ return form.getGroupOperateType() == GroupOperateType.DELETE;
+ }
+
@Override
public ListenerResult listen(WorkflowContext context) {
return ListenerResult.success("Mock delete source success");
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/mocks/MockPlugin.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/mocks/MockPlugin.java
index dc8442ea8..a1ac977e6 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/mocks/MockPlugin.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/mocks/MockPlugin.java
@@ -17,74 +17,33 @@
package org.apache.inlong.manager.service.mocks;
-import org.apache.inlong.manager.common.enums.GroupOperateType;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
-import org.apache.inlong.manager.workflow.WorkflowContext;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
-import org.apache.inlong.manager.workflow.event.EventSelector;
-import org.apache.inlong.manager.workflow.event.task.DataSourceOperateListener;
import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
+import org.apache.inlong.manager.workflow.event.task.SourceOperateListener;
import org.apache.inlong.manager.workflow.plugin.ProcessPlugin;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.ArrayList;
+import java.util.List;
/**
* Test class for process plugin.
*/
public class MockPlugin implements ProcessPlugin {
- public EventSelector stopProcessSelector = new EventSelector() {
- @Override
- public boolean accept(WorkflowContext context) {
- ProcessForm processForm = context.getProcessForm();
- if (!(processForm instanceof GroupResourceProcessForm)) {
- return false;
- }
- GroupResourceProcessForm form = (GroupResourceProcessForm) processForm;
- return form.getGroupOperateType() == GroupOperateType.SUSPEND;
- }
- };
-
- public EventSelector restartProcessSelector = new EventSelector() {
- @Override
- public boolean accept(WorkflowContext context) {
- ProcessForm processForm = context.getProcessForm();
- if (!(processForm instanceof GroupResourceProcessForm)) {
- return false;
- }
- GroupResourceProcessForm form = (GroupResourceProcessForm) processForm;
- return form.getGroupOperateType() == GroupOperateType.RESTART;
- }
- };
-
- public EventSelector deleteProcessSelector = new EventSelector() {
- @Override
- public boolean accept(WorkflowContext context) {
- ProcessForm processForm = context.getProcessForm();
- if (!(processForm instanceof GroupResourceProcessForm)) {
- return false;
- }
- GroupResourceProcessForm form = (GroupResourceProcessForm) processForm;
- return form.getGroupOperateType() == GroupOperateType.DELETE;
- }
- };
-
@Override
- public Map<DataSourceOperateListener, EventSelector> createSourceOperateListeners() {
- Map<DataSourceOperateListener, EventSelector> listeners = new HashMap<>();
- listeners.put(new MockDeleteSourceListener(), deleteProcessSelector);
- listeners.put(new MockRestartSourceListener(), restartProcessSelector);
- listeners.put(new MockStopSourceListener(), stopProcessSelector);
+ public List<SourceOperateListener> createSourceOperateListeners() {
+ List<SourceOperateListener> listeners = new ArrayList<>();
+ listeners.add(new MockDeleteSourceListener());
+ listeners.add(new MockRestartSourceListener());
+ listeners.add(new MockStopSourceListener());
return listeners;
}
@Override
- public Map<SortOperateListener, EventSelector> createSortOperateListeners() {
- Map<SortOperateListener, EventSelector> listeners = new HashMap<>();
- listeners.put(new MockDeleteSortListener(), deleteProcessSelector);
- listeners.put(new MockRestartSortListener(), restartProcessSelector);
- listeners.put(new MockStopSortListener(), stopProcessSelector);
+ public List<SortOperateListener> createSortOperateListeners() {
+ List<SortOperateListener> listeners = new ArrayList<>();
+ listeners.add(new MockDeleteSortListener());
+ listeners.add(new MockRestartSortListener());
+ listeners.add(new MockStopSortListener());
return listeners;
}
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/mocks/MockRestartSortListener.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/mocks/MockRestartSortListener.java
index 196b10777..8972ab1d7 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/mocks/MockRestartSortListener.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/mocks/MockRestartSortListener.java
@@ -17,6 +17,9 @@
package org.apache.inlong.manager.service.mocks;
+import org.apache.inlong.manager.common.enums.GroupOperateType;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
@@ -32,6 +35,16 @@ public class MockRestartSortListener implements SortOperateListener {
return TaskEvent.COMPLETE;
}
+ @Override
+ public boolean accept(WorkflowContext context) {
+ ProcessForm processForm = context.getProcessForm();
+ if (!(processForm instanceof GroupResourceProcessForm)) {
+ return false;
+ }
+ GroupResourceProcessForm form = (GroupResourceProcessForm) processForm;
+ return form.getGroupOperateType() == GroupOperateType.RESTART;
+ }
+
@Override
public ListenerResult listen(WorkflowContext context) {
return ListenerResult.success("Mock restart sort success");
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/mocks/MockRestartSourceListener.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/mocks/MockRestartSourceListener.java
index 3ab440090..a6fa79114 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/mocks/MockRestartSourceListener.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/mocks/MockRestartSourceListener.java
@@ -17,21 +17,34 @@
package org.apache.inlong.manager.service.mocks;
+import org.apache.inlong.manager.common.enums.GroupOperateType;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
-import org.apache.inlong.manager.workflow.event.task.DataSourceOperateListener;
+import org.apache.inlong.manager.workflow.event.task.SourceOperateListener;
import org.apache.inlong.manager.workflow.event.task.TaskEvent;
/**
* Test class for listen restart source event.
*/
-public class MockRestartSourceListener implements DataSourceOperateListener {
+public class MockRestartSourceListener implements SourceOperateListener {
@Override
public TaskEvent event() {
return TaskEvent.COMPLETE;
}
+ @Override
+ public boolean accept(WorkflowContext context) {
+ ProcessForm processForm = context.getProcessForm();
+ if (!(processForm instanceof GroupResourceProcessForm)) {
+ return false;
+ }
+ GroupResourceProcessForm form = (GroupResourceProcessForm) processForm;
+ return form.getGroupOperateType() == GroupOperateType.RESTART;
+ }
+
@Override
public ListenerResult listen(WorkflowContext context) {
return ListenerResult.success("Mock restart source success");
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/mocks/MockStopSortListener.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/mocks/MockStopSortListener.java
index 496efdc4b..f485c69bf 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/mocks/MockStopSortListener.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/mocks/MockStopSortListener.java
@@ -17,6 +17,9 @@
package org.apache.inlong.manager.service.mocks;
+import org.apache.inlong.manager.common.enums.GroupOperateType;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
@@ -32,6 +35,16 @@ public class MockStopSortListener implements SortOperateListener {
return TaskEvent.COMPLETE;
}
+ @Override
+ public boolean accept(WorkflowContext context) {
+ ProcessForm processForm = context.getProcessForm();
+ if (!(processForm instanceof GroupResourceProcessForm)) {
+ return false;
+ }
+ GroupResourceProcessForm form = (GroupResourceProcessForm) processForm;
+ return form.getGroupOperateType() == GroupOperateType.SUSPEND;
+ }
+
@Override
public ListenerResult listen(WorkflowContext context) {
return ListenerResult.success("Mock stop sort success");
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/mocks/MockStopSourceListener.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/mocks/MockStopSourceListener.java
index 54fec8121..ae554323d 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/mocks/MockStopSourceListener.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/mocks/MockStopSourceListener.java
@@ -17,21 +17,34 @@
package org.apache.inlong.manager.service.mocks;
+import org.apache.inlong.manager.common.enums.GroupOperateType;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
-import org.apache.inlong.manager.workflow.event.task.DataSourceOperateListener;
+import org.apache.inlong.manager.workflow.event.task.SourceOperateListener;
import org.apache.inlong.manager.workflow.event.task.TaskEvent;
/**
* Test class for listen stop source event.
*/
-public class MockStopSourceListener implements DataSourceOperateListener {
+public class MockStopSourceListener implements SourceOperateListener {
@Override
public TaskEvent event() {
return TaskEvent.COMPLETE;
}
+ @Override
+ public boolean accept(WorkflowContext context) {
+ ProcessForm processForm = context.getProcessForm();
+ if (!(processForm instanceof GroupResourceProcessForm)) {
+ return false;
+ }
+ GroupResourceProcessForm form = (GroupResourceProcessForm) processForm;
+ return form.getGroupOperateType() == GroupOperateType.SUSPEND;
+ }
+
@Override
public ListenerResult listen(WorkflowContext context) {
return ListenerResult.success("Mock stop source success");
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/GroupTaskListenerFactoryTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/GroupTaskListenerFactoryTest.java
index 33d5ac7ad..46e7a937b 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/GroupTaskListenerFactoryTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/GroupTaskListenerFactoryTest.java
@@ -41,7 +41,7 @@ import java.util.List;
public class GroupTaskListenerFactoryTest extends ServiceBaseTest {
@Autowired
- GroupTaskListenerFactory groupTaskListenerFactory;
+ private GroupTaskListenerFactory groupTaskListenerFactory;
@Test
public void testGetQueueOperateListener() {
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImplTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImplTest.java
index 5070d3ac5..416fd171d 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImplTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImplTest.java
@@ -44,14 +44,11 @@ import org.apache.inlong.manager.workflow.definition.WorkflowTask;
import org.apache.inlong.manager.workflow.event.ListenerResult;
import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
import org.apache.inlong.manager.workflow.event.task.TaskEvent;
-import org.apache.inlong.manager.workflow.event.task.TaskEventListener;
import org.apache.inlong.manager.workflow.util.WorkflowBeanUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
-import java.util.List;
-
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -103,31 +100,29 @@ public class WorkflowServiceImplTest extends ServiceBaseTest {
* Mock the task listener factory
*/
public void mockTaskListenerFactory() {
- CreateTubeGroupTaskListener createTubeGroupTaskListener = mock(CreateTubeGroupTaskListener.class);
- when(createTubeGroupTaskListener.listen(any(WorkflowContext.class))).thenReturn(ListenerResult.success());
- when(createTubeGroupTaskListener.name()).thenReturn(SinkResourceListener.class.getSimpleName());
- when(createTubeGroupTaskListener.event()).thenReturn(TaskEvent.COMPLETE);
- taskListenerFactory.setCreateTubeGroupTaskListener(createTubeGroupTaskListener);
-
- CreateTubeTopicTaskListener createTubeTopicTaskListener = mock(CreateTubeTopicTaskListener.class);
- when(createTubeTopicTaskListener.listen(any(WorkflowContext.class))).thenReturn(ListenerResult.success());
- when(createTubeTopicTaskListener.name()).thenReturn(CreateTubeTopicTaskListener.class.getSimpleName());
- when(createTubeTopicTaskListener.event()).thenReturn(TaskEvent.COMPLETE);
- taskListenerFactory.setCreateTubeTopicTaskListener(createTubeTopicTaskListener);
-
- CreatePulsarResourceTaskListener createPulsarResourceTaskListener = mock(
- CreatePulsarResourceTaskListener.class);
- when(createPulsarResourceTaskListener.listen(any(WorkflowContext.class))).thenReturn(ListenerResult.success());
- when(createPulsarResourceTaskListener.name()).thenReturn(
- CreatePulsarResourceTaskListener.class.getSimpleName());
- when(createPulsarResourceTaskListener.event()).thenReturn(TaskEvent.COMPLETE);
- taskListenerFactory.setCreatePulsarResourceTaskListener(createPulsarResourceTaskListener);
-
- CreatePulsarGroupTaskListener createPulsarGroupTaskListener = mock(CreatePulsarGroupTaskListener.class);
- when(createPulsarGroupTaskListener.listen(any(WorkflowContext.class))).thenReturn(ListenerResult.success());
- when(createPulsarGroupTaskListener.name()).thenReturn(CreatePulsarGroupTaskListener.class.getSimpleName());
- when(createPulsarGroupTaskListener.event()).thenReturn(TaskEvent.COMPLETE);
- taskListenerFactory.setCreatePulsarGroupTaskListener(createPulsarGroupTaskListener);
+ CreateTubeGroupTaskListener createTubeGroupListener = mock(CreateTubeGroupTaskListener.class);
+ when(createTubeGroupListener.listen(any(WorkflowContext.class))).thenReturn(ListenerResult.success());
+ when(createTubeGroupListener.name()).thenReturn(SinkResourceListener.class.getSimpleName());
+ when(createTubeGroupListener.event()).thenReturn(TaskEvent.COMPLETE);
+ taskListenerFactory.setCreateTubeGroupTaskListener(createTubeGroupListener);
+
+ CreateTubeTopicTaskListener createTubeTopicListener = mock(CreateTubeTopicTaskListener.class);
+ when(createTubeTopicListener.listen(any(WorkflowContext.class))).thenReturn(ListenerResult.success());
+ when(createTubeTopicListener.name()).thenReturn(CreateTubeTopicTaskListener.class.getSimpleName());
+ when(createTubeTopicListener.event()).thenReturn(TaskEvent.COMPLETE);
+ taskListenerFactory.setCreateTubeTopicTaskListener(createTubeTopicListener);
+
+ CreatePulsarResourceTaskListener createPulsarResourceListener = mock(CreatePulsarResourceTaskListener.class);
+ when(createPulsarResourceListener.listen(any(WorkflowContext.class))).thenReturn(ListenerResult.success());
+ when(createPulsarResourceListener.name()).thenReturn(CreatePulsarResourceTaskListener.class.getSimpleName());
+ when(createPulsarResourceListener.event()).thenReturn(TaskEvent.COMPLETE);
+ taskListenerFactory.setCreatePulsarResourceTaskListener(createPulsarResourceListener);
+
+ CreatePulsarGroupTaskListener createPulsarGroupListener = mock(CreatePulsarGroupTaskListener.class);
+ when(createPulsarGroupListener.listen(any(WorkflowContext.class))).thenReturn(ListenerResult.success());
+ when(createPulsarGroupListener.name()).thenReturn(CreatePulsarGroupTaskListener.class.getSimpleName());
+ when(createPulsarGroupListener.event()).thenReturn(TaskEvent.COMPLETE);
+ taskListenerFactory.setCreatePulsarGroupTaskListener(createPulsarGroupListener);
SinkResourceListener sinkResourceListener = mock(SinkResourceListener.class);
when(sinkResourceListener.listen(any(WorkflowContext.class))).thenReturn(ListenerResult.success());
@@ -138,7 +133,7 @@ public class WorkflowServiceImplTest extends ServiceBaseTest {
taskListenerFactory.clearListeners();
taskListenerFactory.init();
SortOperateListener mockOperateListener = createMockSortListener();
- taskListenerFactory.getSortOperateListeners().put(mockOperateListener, context -> true);
+ taskListenerFactory.getSortOperateListeners().add(mockOperateListener);
}
public SortOperateListener createMockSortListener() {
@@ -169,11 +164,6 @@ public class WorkflowServiceImplTest extends ServiceBaseTest {
WorkflowProcess process = context.getProcess();
WorkflowTask task = process.getTaskByName("InitMQ");
Assertions.assertTrue(task instanceof ServiceTask);
- Assertions.assertEquals(2, task.getNameToListenerMap().size());
-
- List<TaskEventListener> listeners = Lists.newArrayList(task.getNameToListenerMap().values());
- Assertions.assertTrue(listeners.get(0) instanceof CreatePulsarGroupTaskListener);
- Assertions.assertTrue(listeners.get(1) instanceof CreatePulsarResourceTaskListener);
// Integer processId = processResponse.getId();
// context = processService.continueProcess(processId, applicant, "continue process");
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/group/CreateGroupWorkflowDefinitionTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/group/CreateGroupWorkflowDefinitionTest.java
index e428fbe1c..31c74a2cb 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/group/CreateGroupWorkflowDefinitionTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/group/CreateGroupWorkflowDefinitionTest.java
@@ -24,7 +24,7 @@ import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
/**
- * Test class for ldefine process.
+ * Test class for workflow define process.
*/
public class CreateGroupWorkflowDefinitionTest extends ServiceBaseTest {
diff --git a/inlong-manager/manager-service/src/test/resources/log4j2.xml b/inlong-manager/manager-service/src/test/resources/log4j2.xml
new file mode 100644
index 000000000..a6a161d3f
--- /dev/null
+++ b/inlong-manager/manager-service/src/test/resources/log4j2.xml
@@ -0,0 +1,46 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<configuration status="WARN" monitorInterval="30">
+ <Properties>
+ <property name="basePath">logs</property>
+ <property name="log_pattern">%d{yyyy-MM-dd HH:mm:ss.SSS} -%5p [%20.20t] %-30.30C{1.}:%L - %m%n</property>
+ <property name="output_log_level">DEBUG</property>
+ <property name="all_fileName">${basePath}/manager-ut.log</property>
+ <property name="console_print_level">DEBUG</property>
+ </Properties>
+
+ <appenders>
+ <Console name="Console" target="SYSTEM_OUT">
+ <ThresholdFilter level="${console_print_level}" onMatch="ACCEPT" onMismatch="DENY"/>
+ <PatternLayout pattern="${log_pattern}"/>
+ <follow>true</follow>
+ </Console>
+ <File name="AllFile" fileName="${all_fileName}">
+ <PatternLayout pattern="${log_pattern}"/>
+ </File>
+ </appenders>
+
+ <loggers>
+ <root level="${output_log_level}">
+ <appender-ref ref="Console"/>
+ <appender-ref ref="AllFile"/>
+ </root>
+ </loggers>
+</configuration>
\ No newline at end of file
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/ServiceTask.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/ServiceTask.java
index 91c1060bf..8b784fa30 100644
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/ServiceTask.java
+++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/ServiceTask.java
@@ -20,8 +20,6 @@ package org.apache.inlong.manager.workflow.definition;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.manager.common.exceptions.WorkflowException;
@@ -45,14 +43,12 @@ import java.util.stream.Collectors;
@Slf4j
public class ServiceTask extends WorkflowTask {
- public static final Gson GSON = new GsonBuilder().create();
-
private static final Set<WorkflowAction> SUPPORTED_ACTIONS = ImmutableSet
.of(WorkflowAction.COMPLETE, WorkflowAction.CANCEL, WorkflowAction.TERMINATE);
private final AtomicBoolean isInit = new AtomicBoolean(false);
- private ServiceTaskListenerProvider<TaskEventListener> listenerProvider;
- private ServiceTaskType serviceTaskType;
+ private TaskListenerFactory listenerFactory;
+ private ServiceTaskType taskType;
@Override
public WorkflowAction defaultNextAction() {
@@ -91,14 +87,14 @@ public class ServiceTask extends WorkflowTask {
}
}
- @SneakyThrows
@Override
+ @SneakyThrows
public ServiceTask clone() {
ServiceTask serviceTask = new ServiceTask();
serviceTask.setName(this.getName());
serviceTask.setDisplayName(this.getDisplayName());
- serviceTask.addServiceTaskType(this.serviceTaskType);
- serviceTask.addListenerProvider(this.listenerProvider);
+ serviceTask.setServiceTaskType(this.taskType);
+ serviceTask.setListenerFactory(this.listenerFactory);
Map<WorkflowAction, List<ConditionNextElement>> cloneActionToNextElementMap = Maps.newHashMap();
this.getActionToNextElementMap().forEach(
(k, v) -> cloneActionToNextElementMap.put(k, v.stream().map(ele -> {
@@ -113,27 +109,35 @@ public class ServiceTask extends WorkflowTask {
return serviceTask;
}
- public void addListenerProvider(ServiceTaskListenerProvider<TaskEventListener> provider) {
- this.listenerProvider = provider;
+ /**
+ * Set the listener factory for the Service Task.
+ */
+ public void setListenerFactory(TaskListenerFactory factory) {
+ this.listenerFactory = factory;
}
- public void addServiceTaskType(ServiceTaskType type) {
- this.serviceTaskType = type;
+ /**
+ * Set the task type for the Service Task.
+ */
+ public void setServiceTaskType(ServiceTaskType type) {
+ this.taskType = type;
}
+ /**
+ * Init the listeners for current Service Task.
+ */
public void initListeners(WorkflowContext workflowContext) {
if (isInit.compareAndSet(false, true)) {
- if (listenerProvider == null || serviceTaskType == null) {
+ if (listenerFactory == null || taskType == null) {
return;
}
- List<TaskEventListener> listeners = Lists.newArrayList(
- listenerProvider.get(workflowContext, serviceTaskType));
-
+ List<TaskEventListener> listeners = Lists.newArrayList(listenerFactory.get(workflowContext, taskType));
List<String> listenerNames = listeners.stream().map(EventListener::name).collect(Collectors.toList());
- log.info("ServiceTask:{} is init for listeners:{}", getName(), GSON.toJson(listenerNames));
+ log.info("ServiceTask: [{}] is init for listeners: {}", getName(), listenerNames);
addListeners(listeners);
} else {
- log.info("ServiceTask:{} is already init", getName());
+ log.warn("ServiceTask [{}] was already init", getName());
}
}
+
}
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/ServiceTaskListenerProvider.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/TaskListenerFactory.java
similarity index 68%
rename from inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/ServiceTaskListenerProvider.java
rename to inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/TaskListenerFactory.java
index e93c2e4f9..6966b9cc0 100644
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/ServiceTaskListenerProvider.java
+++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/TaskListenerFactory.java
@@ -20,11 +20,20 @@ package org.apache.inlong.manager.workflow.definition;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.task.TaskEventListener;
+import java.util.List;
+
/**
- * An object capable of providing instances of TaskEventListener
+ * The factory interface of TaskEventListener.
*/
-public interface ServiceTaskListenerProvider<T extends TaskEventListener> {
+public interface TaskListenerFactory {
- Iterable<T> get(WorkflowContext workflowContext, ServiceTaskType serviceTaskType);
+ /**
+ * Get the task event listeners from the given workflow context and the specified task type.
+ *
+ * @param workflowContext the workflow context
+ * @param taskType the task type
+ * @return list of the task event listeners
+ */
+ List<? extends TaskEventListener> get(WorkflowContext workflowContext, ServiceTaskType taskType);
}
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/WorkflowTask.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/WorkflowTask.java
index 861e07f27..0029d3b5f 100644
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/WorkflowTask.java
+++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/WorkflowTask.java
@@ -59,7 +59,7 @@ public abstract class WorkflowTask extends NextableElement {
* Get sync task event listener list.
*/
public List<TaskEventListener> listeners(TaskEvent taskEvent) {
- return this.listeners.getOrDefault(taskEvent, TaskEventListener.EMPTY_LIST);
+ return this.listeners.getOrDefault(taskEvent, TaskEventListener.EMPTY_LISTENERS);
}
/**
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/EventSelector.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/EventSelector.java
deleted file mode 100644
index 12ec1b80f..000000000
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/EventSelector.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.workflow.event;
-
-import org.apache.inlong.manager.workflow.WorkflowContext;
-
-/**
- * a selector allowing to decide which event is selected
- */
-public interface EventSelector {
-
- EventSelector SELECT_ANY = context -> true;
-
- boolean accept(WorkflowContext context);
-
-}
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/task/QueueOperateListener.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/task/QueueOperateListener.java
index 5df99e3cd..3e8f28475 100644
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/task/QueueOperateListener.java
+++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/task/QueueOperateListener.java
@@ -21,11 +21,12 @@ import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
/**
- * Listener of operate queue.
+ * Listener of operate Message Queue.
*/
public interface QueueOperateListener extends TaskEventListener {
QueueOperateListener DEFAULT_QUEUE_OPERATE_LISTENER = new QueueOperateListener() {
+
@Override
public TaskEvent event() {
return TaskEvent.COMPLETE;
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/task/SinkOperateListener.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/task/SinkOperateListener.java
index 9b6a7a2bc..2a3b711c6 100644
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/task/SinkOperateListener.java
+++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/task/SinkOperateListener.java
@@ -21,11 +21,12 @@ import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
/**
- * Listener of operate sink.
+ * Listener of operate Sink.
*/
public interface SinkOperateListener extends TaskEventListener {
SinkOperateListener DEFAULT_SINK_OPERATE_LISTENER = new SinkOperateListener() {
+
@Override
public TaskEvent event() {
return TaskEvent.COMPLETE;
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/task/SortOperateListener.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/task/SortOperateListener.java
index 23b4585e2..3665069e0 100644
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/task/SortOperateListener.java
+++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/task/SortOperateListener.java
@@ -21,11 +21,12 @@ import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
/**
- * Listener of operate sort.
+ * Listener of operate Sort.
*/
public interface SortOperateListener extends TaskEventListener {
SortOperateListener DEFAULT_SORT_OPERATE_LISTENER = new SortOperateListener() {
+
@Override
public TaskEvent event() {
return TaskEvent.COMPLETE;
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/task/DataSourceOperateListener.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/task/SourceOperateListener.java
similarity index 85%
rename from inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/task/DataSourceOperateListener.java
rename to inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/task/SourceOperateListener.java
index 69d7e4c79..8abb11f5c 100644
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/task/DataSourceOperateListener.java
+++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/task/SourceOperateListener.java
@@ -21,11 +21,12 @@ import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
/**
- * Listener of operate data source.
+ * Listener of operate Source.
*/
-public interface DataSourceOperateListener extends TaskEventListener {
+public interface SourceOperateListener extends TaskEventListener {
+
+ SourceOperateListener DEFAULT_SOURCE_OPERATE_LISTENER = new SourceOperateListener() {
- DataSourceOperateListener DEFAULT_SOURCE_OPERATE_LISTENER = new DataSourceOperateListener() {
@Override
public TaskEvent event() {
return TaskEvent.COMPLETE;
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/task/TaskEventListener.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/task/TaskEventListener.java
index 179328c91..36347531c 100644
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/task/TaskEventListener.java
+++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/task/TaskEventListener.java
@@ -18,6 +18,7 @@
package org.apache.inlong.manager.workflow.event.task;
import com.google.common.collect.Lists;
+import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.EventListener;
import java.util.List;
@@ -27,6 +28,19 @@ import java.util.List;
*/
public interface TaskEventListener extends EventListener<TaskEvent> {
- List<TaskEventListener> EMPTY_LIST = Lists.newArrayList();
+ /**
+ * Empty event listeners.
+ */
+ List<TaskEventListener> EMPTY_LISTENERS = Lists.newArrayList();
+
+ /**
+ * Whether the current listener needs to operate the workflow.
+ *
+ * @param context workflow context
+ * @return true if the current listener needs to operate the workflow, false if not
+ */
+ default boolean accept(WorkflowContext context) {
+ return true;
+ }
}
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/ProcessPlugin.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/ProcessPlugin.java
index 91606db4e..e771db27f 100644
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/ProcessPlugin.java
+++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/ProcessPlugin.java
@@ -17,32 +17,31 @@
package org.apache.inlong.manager.workflow.plugin;
-import org.apache.inlong.manager.workflow.event.EventSelector;
-import org.apache.inlong.manager.workflow.event.task.DataSourceOperateListener;
import org.apache.inlong.manager.workflow.event.task.QueueOperateListener;
-import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
import org.apache.inlong.manager.workflow.event.task.SinkOperateListener;
+import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
+import org.apache.inlong.manager.workflow.event.task.SourceOperateListener;
-import java.util.Map;
+import java.util.List;
/**
* Interface of process plugin.
*/
public interface ProcessPlugin extends Plugin {
- default Map<DataSourceOperateListener, EventSelector> createSourceOperateListeners() {
+ default List<SourceOperateListener> createSourceOperateListeners() {
return null;
}
- default Map<SinkOperateListener, EventSelector> createSinkOperateListeners() {
+ default List<SinkOperateListener> createSinkOperateListeners() {
return null;
}
- default Map<QueueOperateListener, EventSelector> createQueueOperateListeners() {
+ default List<QueueOperateListener> createQueueOperateListeners() {
return null;
}
- default Map<SortOperateListener, EventSelector> createSortOperateListeners() {
+ default List<SortOperateListener> createSortOperateListeners() {
return null;
}