You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/04/11 07:13:01 UTC
[incubator-inlong] branch master updated: [INLONG-3601][Manager] Should not create Hive resource when sink type is Kafka (#3602)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 50ee36fc1 [INLONG-3601][Manager] Should not create Hive resource when sink type is Kafka (#3602)
50ee36fc1 is described below
commit 50ee36fc1b16963f29969844a127e4d5e38cb14f
Author: healchow <he...@gmail.com>
AuthorDate: Mon Apr 11 15:12:55 2022 +0800
[INLONG-3601][Manager] Should not create Hive resource when sink type is Kafka (#3602)
---
.../manager/common/pojo/source/SourceRequest.java | 2 +-
.../plugin/eventselect/DeleteProcessSelector.java | 22 ++++++++++++----------
.../plugin/eventselect/RestartProcessSelector.java | 18 ++++++++++--------
.../plugin/eventselect/StartupProcessSelector.java | 11 ++++++-----
.../plugin/eventselect/SuspendProcessSelector.java | 18 ++++++++++--------
...r.java => CreateHiveSinkForStreamListener.java} | 2 +-
...leListener.java => CreateHiveSinkListener.java} | 9 ++++-----
...entSelector.java => HiveSinkEventSelector.java} | 18 +++++++++++-------
.../service/thirdparty/mq/PulsarEventSelector.java | 15 ++++++++++++---
.../service/thirdparty/mq/TubeEventSelector.java | 11 ++++++-----
.../thirdparty/sort/ZkDisabledEventSelector.java | 14 ++++++++++++--
.../thirdparty/sort/ZkEnabledEventSelector.java | 10 +++++++++-
.../workflow/ServiceTaskListenerFactory.java | 10 +++++-----
.../stream/CreateStreamWorkflowDefinition.java | 4 ++--
...torTest.java => HiveSinkEventSelectorTest.java} | 10 +++++-----
.../service/workflow/WorkflowServiceImplTest.java | 14 +++++++-------
16 files changed, 113 insertions(+), 75 deletions(-)
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceRequest.java
index 6249a266c..ef062faa1 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceRequest.java
@@ -69,7 +69,7 @@ public class SourceRequest {
@ApiModelProperty("Snapshot of the source task")
private String snapshot;
- @ApiModelProperty("Data Serialization, support: json, canal, avro, etc")
+ @ApiModelProperty("Data Serialization, support: csv, json, canal, avro, etc")
private String serializationType;
@ApiModelProperty("Version")
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/DeleteProcessSelector.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/DeleteProcessSelector.java
index dc5f8626c..bcecc4c2d 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/DeleteProcessSelector.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/DeleteProcessSelector.java
@@ -26,23 +26,25 @@ import org.apache.inlong.manager.workflow.event.EventSelector;
@Slf4j
public class DeleteProcessSelector implements EventSelector {
+
@SneakyThrows
@Override
- public boolean accept(WorkflowContext workflowContext) {
- String inlongGroupId = workflowContext.getProcessForm().getInlongGroupId();
- log.info("inlongGroupId:{} enter deleteProcess listener", inlongGroupId);
- ProcessForm processForm = workflowContext.getProcessForm();
- if (processForm == null || !(processForm instanceof UpdateGroupProcessForm)) {
- log.info("inlongGroupId:{} not add deleteProcess listener", inlongGroupId);
+ public boolean accept(WorkflowContext context) {
+ ProcessForm processForm = context.getProcessForm();
+ String groupId = processForm.getInlongGroupId();
+ if (!(processForm instanceof UpdateGroupProcessForm)) {
+ log.info("not add deleteProcess listener as the form was not UpdateGroup for groupId [{}]", groupId);
return false;
}
- UpdateGroupProcessForm updateGroupProcessForm = (UpdateGroupProcessForm) processForm;
- boolean flag = updateGroupProcessForm.getOperateType() == UpdateGroupProcessForm.OperateType.DELETE;
+
+ UpdateGroupProcessForm updateProcessForm = (UpdateGroupProcessForm) processForm;
+ boolean flag = updateProcessForm.getOperateType() == UpdateGroupProcessForm.OperateType.DELETE;
if (!flag) {
- log.info("inlongGroupId:{} not add deleteProcess listener, not DELETE", inlongGroupId);
+ log.info("not add deleteProcess listener as the operate was not DELETE for groupId [{}]", groupId);
return false;
}
- log.info("inlongGroupId:{} add deleteProcess listener", inlongGroupId);
+
+ log.info("add deleteProcess listener for groupId [{}]", groupId);
return true;
}
}
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/RestartProcessSelector.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/RestartProcessSelector.java
index ef210f339..9d27bc7c6 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/RestartProcessSelector.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/RestartProcessSelector.java
@@ -26,23 +26,25 @@ import org.apache.inlong.manager.workflow.event.EventSelector;
@Slf4j
public class RestartProcessSelector implements EventSelector {
+
@SneakyThrows
@Override
public boolean accept(WorkflowContext workflowContext) {
- String inlongGroupId = workflowContext.getProcessForm().getInlongGroupId();
- log.info("inlongGroupId:{} enter restartProcess listener", inlongGroupId);
ProcessForm processForm = workflowContext.getProcessForm();
- if (processForm == null || !(processForm instanceof UpdateGroupProcessForm)) {
- log.info("inlongGroupId:{} not add restartProcess listener", inlongGroupId);
+ String groupId = processForm.getInlongGroupId();
+ if (!(processForm instanceof UpdateGroupProcessForm)) {
+ log.info("not add restartProcess listener as the form was not UpdateGroup for groupId [{}]", groupId);
return false;
}
- UpdateGroupProcessForm updateGroupProcessForm = (UpdateGroupProcessForm) processForm;
- boolean flag = updateGroupProcessForm.getOperateType() == UpdateGroupProcessForm.OperateType.RESTART;
+
+ UpdateGroupProcessForm updateProcessForm = (UpdateGroupProcessForm) processForm;
+ boolean flag = updateProcessForm.getOperateType() == UpdateGroupProcessForm.OperateType.RESTART;
if (!flag) {
- log.info("inlongGroupId:{} not add restartProcess listener, not RESTART", inlongGroupId);
+ log.info("not add restartProcess listener as the operate was not RESTART for groupId [{}]", groupId);
return false;
}
- log.info("inlongGroupId:{} add restartProcess listener", inlongGroupId);
+
+ log.info("add restartProcess listener for groupId [{}]", groupId);
return true;
}
}
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/StartupProcessSelector.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/StartupProcessSelector.java
index de0ab0914..eaa44f906 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/StartupProcessSelector.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/StartupProcessSelector.java
@@ -26,17 +26,18 @@ import org.apache.inlong.manager.workflow.event.EventSelector;
@Slf4j
public class StartupProcessSelector implements EventSelector {
+
@SneakyThrows
@Override
public boolean accept(WorkflowContext workflowContext) {
- String inlongGroupId = workflowContext.getProcessForm().getInlongGroupId();
- log.info("inlongGroupId:{} enter startupProcess listener", inlongGroupId);
ProcessForm processForm = workflowContext.getProcessForm();
- if (processForm == null || !(processForm instanceof GroupResourceProcessForm)) {
- log.info("inlongGroupId:{} not add startupProcess listener", inlongGroupId);
+ String groupId = processForm.getInlongGroupId();
+ if (!(processForm instanceof GroupResourceProcessForm)) {
+ log.info("not add startupProcess listener as the form was not GroupResource for groupId [{}]", groupId);
return false;
}
- log.info("inlongGroupId:{} add startupProcess listener", inlongGroupId);
+
+ log.info("add startupProcess listener for groupId [{}]", groupId);
return true;
}
}
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/SuspendProcessSelector.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/SuspendProcessSelector.java
index 16c0f91da..5df1cff53 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/SuspendProcessSelector.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/SuspendProcessSelector.java
@@ -26,23 +26,25 @@ import org.apache.inlong.manager.workflow.event.EventSelector;
@Slf4j
public class SuspendProcessSelector implements EventSelector {
+
@SneakyThrows
@Override
public boolean accept(WorkflowContext workflowContext) {
- String inlongGroupId = workflowContext.getProcessForm().getInlongGroupId();
- log.info("inlongGroupId:{} enter suspendProcess listener", inlongGroupId);
ProcessForm processForm = workflowContext.getProcessForm();
- if (processForm == null || !(processForm instanceof UpdateGroupProcessForm)) {
- log.info("inlongGroupId:{} not add suspendProcess listener", inlongGroupId);
+ String groupId = processForm.getInlongGroupId();
+ if (!(processForm instanceof UpdateGroupProcessForm)) {
+ log.info("not add suspendProcess listener as the form was not UpdateGroup for groupId [{}]", groupId);
return false;
}
- UpdateGroupProcessForm updateGroupProcessForm = (UpdateGroupProcessForm) processForm;
- boolean flag = updateGroupProcessForm.getOperateType() == UpdateGroupProcessForm.OperateType.SUSPEND;
+
+ UpdateGroupProcessForm updateProcessForm = (UpdateGroupProcessForm) processForm;
+ boolean flag = updateProcessForm.getOperateType() == UpdateGroupProcessForm.OperateType.SUSPEND;
if (!flag) {
- log.info("inlongGroupId:{} not add suspendProcess listener, not SUSPEND", inlongGroupId);
+ log.info("not add suspendProcess listener as the operate was not SUSPEND for groupId [{}]", groupId);
return false;
}
- log.info("inlongGroupId:{} add suspendProcess listener", inlongGroupId);
+
+ log.info("add suspendProcess listener for groupId [{}]", groupId);
return true;
}
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/hive/CreateHiveTableForStreamListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/hive/CreateHiveSinkForStreamListener.java
similarity index 97%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/hive/CreateHiveTableForStreamListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/hive/CreateHiveSinkForStreamListener.java
index 38d59b73d..b4d16b823 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/hive/CreateHiveTableForStreamListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/hive/CreateHiveSinkForStreamListener.java
@@ -37,7 +37,7 @@ import lombok.extern.slf4j.Slf4j;
*/
@Service
@Slf4j
-public class CreateHiveTableForStreamListener implements SinkOperateListener {
+public class CreateHiveSinkForStreamListener implements SinkOperateListener {
@Autowired
private StreamSinkEntityMapper sinkMapper;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/hive/CreateHiveTableListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/hive/CreateHiveSinkListener.java
similarity index 97%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/hive/CreateHiveTableListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/hive/CreateHiveSinkListener.java
index 984161bba..9be1bc60c 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/hive/CreateHiveTableListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/hive/CreateHiveSinkListener.java
@@ -17,9 +17,7 @@
package org.apache.inlong.manager.service.thirdparty.hive;
-import java.util.List;
-import java.util.stream.Collectors;
-
+import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.manager.common.pojo.sink.SinkForSortDTO;
import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
@@ -31,14 +29,15 @@ import org.apache.inlong.manager.workflow.event.task.TaskEvent;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
-import lombok.extern.slf4j.Slf4j;
+import java.util.List;
+import java.util.stream.Collectors;
/**
* Event listener of create hive table for all inlong stream
*/
@Service
@Slf4j
-public class CreateHiveTableListener implements SinkOperateListener {
+public class CreateHiveSinkListener implements SinkOperateListener {
@Autowired
private StreamSinkEntityMapper sinkMapper;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/hive/CreateHiveTableEventSelector.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/hive/HiveSinkEventSelector.java
similarity index 82%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/hive/CreateHiveTableEventSelector.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/hive/HiveSinkEventSelector.java
index 214d5f0f2..1e962e1e2 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/hive/CreateHiveTableEventSelector.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/hive/HiveSinkEventSelector.java
@@ -36,7 +36,7 @@ import java.util.stream.Collectors;
@Component
@Slf4j
-public class CreateHiveTableEventSelector implements EventSelector {
+public class HiveSinkEventSelector implements EventSelector {
@Autowired
private StreamSinkService sinkService;
@@ -49,23 +49,27 @@ public class CreateHiveTableEventSelector implements EventSelector {
if (!(processForm instanceof GroupResourceProcessForm)) {
return false;
}
+
GroupResourceProcessForm form = (GroupResourceProcessForm) processForm;
+ String groupId = form.getInlongGroupId();
if (form.getGroupInfo() == null || StringUtils.isEmpty(form.getGroupInfo().getInlongGroupId())) {
+ log.info("not add create hive table listener as the info was null for groupId [{}]", groupId);
return false;
}
- String groupId = form.getInlongGroupId();
- List<String> dsForHive = sinkService.getExistsStreamIdList(groupId, SinkType.SINK_HIVE,
+ List<String> streamWithHiveSink = sinkService.getExistsStreamIdList(groupId, SinkType.SINK_HIVE,
streamMapper.selectByGroupId(groupId)
.stream()
.map(InlongStreamEntity::getInlongStreamId)
.collect(Collectors.toList()));
- if (CollectionUtils.isEmpty(dsForHive)) {
- log.warn("groupId={} streamId={} does not have sink, skip to create hive table ",
+ if (CollectionUtils.isEmpty(streamWithHiveSink)) {
+ log.warn("skip to create hive table as no hive sink found for groupId={} streamId={}",
groupId, form.getInlongStreamId());
- return true;
+ return false;
}
- return false;
+
+ log.info("add create hive table listener for groupId [{}]", groupId);
+ return true;
}
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/PulsarEventSelector.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/PulsarEventSelector.java
index 5d4289b05..c33301a67 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/PulsarEventSelector.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/PulsarEventSelector.java
@@ -34,14 +34,23 @@ public class PulsarEventSelector implements EventSelector {
if (!(processForm instanceof GroupResourceProcessForm)) {
return false;
}
+
GroupResourceProcessForm form = (GroupResourceProcessForm) processForm;
+ String groupId = form.getInlongGroupId();
MQType mqType = MQType.forType(form.getGroupInfo().getMiddlewareType());
if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
InlongGroupPulsarInfo pulsarInfo = (InlongGroupPulsarInfo) form.getGroupInfo().getMqExtInfo();
- return pulsarInfo.getEnableCreateResource() == 1;
+ boolean enable = pulsarInfo.getEnableCreateResource() == 1;
+ if (enable) {
+ log.info("need to create pulsar resource as the createResource was true for groupId [{}]", groupId);
+ return true;
+ } else {
+ log.info("skip to create pulsar resource as the createResource was false for groupId [{}]", groupId);
+ return false;
+ }
}
- log.warn("no need to create pulsar subscription group for groupId={}, as the middlewareType={}",
- form.getInlongGroupId(), mqType);
+
+ log.warn("skip to create pulsar subscription as the mq type is {} for groupId [{}]", mqType, groupId);
return false;
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/TubeEventSelector.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/TubeEventSelector.java
index 3b5e71383..0f35fffb5 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/TubeEventSelector.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/TubeEventSelector.java
@@ -19,7 +19,6 @@ package org.apache.inlong.manager.service.thirdparty.mq;
import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.manager.common.enums.MQType;
-import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
import org.apache.inlong.manager.workflow.WorkflowContext;
@@ -35,12 +34,14 @@ public class TubeEventSelector implements EventSelector {
return false;
}
GroupResourceProcessForm form = (GroupResourceProcessForm) processForm;
- InlongGroupInfo groupInfo = form.getGroupInfo();
- if (MQType.forType(groupInfo.getMiddlewareType()) == MQType.TUBE) {
+ String groupId = form.getInlongGroupId();
+ MQType mqType = MQType.forType(form.getGroupInfo().getMiddlewareType());
+ if (mqType == MQType.TUBE) {
+ log.info("need to create tube resource for groupId [{}]", groupId);
return true;
}
- log.warn("not need to create tube resource for groupId={}, as the middleware type is {}",
- groupInfo.getMiddlewareType(), form.getInlongGroupId());
+
+ log.warn("skip to to create tube resource as the mq type is {} for groupId [{}]", mqType, groupId);
return false;
}
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/ZkDisabledEventSelector.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/ZkDisabledEventSelector.java
index 9686c8db4..ffc376ede 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/ZkDisabledEventSelector.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/ZkDisabledEventSelector.java
@@ -17,6 +17,7 @@
package org.apache.inlong.manager.service.thirdparty.sort;
+import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.manager.common.enums.MQType;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
@@ -29,23 +30,32 @@ import org.springframework.stereotype.Component;
/**
* Event selector for whether ZooKeeper is disabled.
*/
+@Slf4j
@Component
public class ZkDisabledEventSelector implements EventSelector {
@Override
public boolean accept(WorkflowContext context) {
ProcessForm processForm = context.getProcessForm();
+ String groupId = processForm.getInlongGroupId();
if (processForm instanceof GroupResourceProcessForm) {
GroupResourceProcessForm groupResourceForm = (GroupResourceProcessForm) processForm;
InlongGroupInfo groupInfo = groupResourceForm.getGroupInfo();
- return groupInfo.getZookeeperEnabled() == 0
+
+ boolean enable = groupInfo.getZookeeperEnabled() == 0
&& MQType.forType(groupInfo.getMiddlewareType()) != MQType.NONE;
+ log.info("zookeeper disabled was [{}] for groupId [{}]", enable, groupId);
+ return enable;
} else if (processForm instanceof UpdateGroupProcessForm) {
UpdateGroupProcessForm updateGroupProcessForm = (UpdateGroupProcessForm) processForm;
InlongGroupInfo groupInfo = updateGroupProcessForm.getGroupInfo();
- return groupInfo.getZookeeperEnabled() == 0
+
+ boolean enable = groupInfo.getZookeeperEnabled() == 0
&& MQType.forType(groupInfo.getMiddlewareType()) != MQType.NONE;
+ log.info("zookeeper disabled was [{}] for groupId [{}]", enable, groupId);
+ return enable;
} else {
+ log.info("zk disabled for groupId [{}]", groupId);
return false;
}
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/ZkEnabledEventSelector.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/ZkEnabledEventSelector.java
index 05e2b8062..057dfb83e 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/ZkEnabledEventSelector.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/ZkEnabledEventSelector.java
@@ -17,6 +17,7 @@
package org.apache.inlong.manager.service.thirdparty.sort;
+import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.manager.common.enums.MQType;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
@@ -28,18 +29,25 @@ import org.springframework.stereotype.Component;
/**
* Event selector for whether ZooKeeper is enabled.
*/
+@Slf4j
@Component
public class ZkEnabledEventSelector implements EventSelector {
@Override
public boolean accept(WorkflowContext context) {
ProcessForm processForm = context.getProcessForm();
+ String groupId = processForm.getInlongGroupId();
if (!(processForm instanceof GroupResourceProcessForm)) {
+ log.info("zookeeper enabled was [false] for groupId [{}]", groupId);
return false;
}
+
GroupResourceProcessForm groupResourceForm = (GroupResourceProcessForm) processForm;
InlongGroupInfo groupInfo = groupResourceForm.getGroupInfo();
- return groupInfo.getZookeeperEnabled() == 1 && MQType.forType(groupInfo.getMiddlewareType()) != MQType.NONE;
+ boolean enable =
+ groupInfo.getZookeeperEnabled() == 1 && MQType.forType(groupInfo.getMiddlewareType()) != MQType.NONE;
+ log.info("zookeeper enabled was [{}] for groupId [{}]", enable, groupId);
+ return enable;
}
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/ServiceTaskListenerFactory.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/ServiceTaskListenerFactory.java
index 1c1d308a7..fe9ca4b70 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/ServiceTaskListenerFactory.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/ServiceTaskListenerFactory.java
@@ -26,8 +26,8 @@ import org.apache.inlong.manager.service.source.listener.SourceRestartEventSelec
import org.apache.inlong.manager.service.source.listener.SourceRestartListener;
import org.apache.inlong.manager.service.source.listener.SourceStopEventSelector;
import org.apache.inlong.manager.service.source.listener.SourceStopListener;
-import org.apache.inlong.manager.service.thirdparty.hive.CreateHiveTableEventSelector;
-import org.apache.inlong.manager.service.thirdparty.hive.CreateHiveTableListener;
+import org.apache.inlong.manager.service.thirdparty.hive.HiveSinkEventSelector;
+import org.apache.inlong.manager.service.thirdparty.hive.CreateHiveSinkListener;
import org.apache.inlong.manager.service.thirdparty.mq.CreatePulsarGroupTaskListener;
import org.apache.inlong.manager.service.thirdparty.mq.CreatePulsarResourceTaskListener;
import org.apache.inlong.manager.service.thirdparty.mq.CreateTubeGroupTaskListener;
@@ -97,9 +97,9 @@ public class ServiceTaskListenerFactory implements PluginBinder, ServiceTaskList
@Autowired
@Setter
- private CreateHiveTableListener createHiveTableListener;
+ private CreateHiveSinkListener createHiveSinkListener;
@Autowired
- private CreateHiveTableEventSelector createHiveTableEventSelector;
+ private HiveSinkEventSelector hiveSinkEventSelector;
@Autowired
@Setter
@@ -119,7 +119,7 @@ public class ServiceTaskListenerFactory implements PluginBinder, ServiceTaskList
sourceOperateListeners.put(sourceDeleteListener, new SourceDeleteEventSelector());
sourceOperateListeners.put(sourceRestartListener, new SourceRestartEventSelector());
sinkOperateListeners = new LinkedHashMap<>();
- sinkOperateListeners.put(createHiveTableListener, createHiveTableEventSelector);
+ sinkOperateListeners.put(createHiveSinkListener, hiveSinkEventSelector);
queueOperateListeners = new LinkedHashMap<>();
queueOperateListeners.put(createTubeTopicTaskListener, new TubeEventSelector());
queueOperateListeners.put(createTubeGroupTaskListener, new TubeEventSelector());
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java
index 373f0eea7..5cb5b1888 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java
@@ -23,7 +23,7 @@ import org.apache.inlong.manager.common.enums.MQType;
import org.apache.inlong.manager.common.enums.SinkType;
import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
import org.apache.inlong.manager.service.sink.StreamSinkService;
-import org.apache.inlong.manager.service.thirdparty.hive.CreateHiveTableForStreamListener;
+import org.apache.inlong.manager.service.thirdparty.hive.CreateHiveSinkForStreamListener;
import org.apache.inlong.manager.service.thirdparty.mq.CreatePulsarGroupForStreamTaskListener;
import org.apache.inlong.manager.service.thirdparty.mq.CreatePulsarTopicForStreamTaskListener;
import org.apache.inlong.manager.service.thirdparty.sort.PushSortConfigListener;
@@ -56,7 +56,7 @@ public class CreateStreamWorkflowDefinition implements WorkflowDefinition {
@Autowired
private StreamCompleteProcessListener streamCompleteProcessListener;
@Autowired
- private CreateHiveTableForStreamListener createHiveTableListener;
+ private CreateHiveSinkForStreamListener createHiveTableListener;
@Autowired
private PushSortConfigListener pushSortConfigListener;
@Autowired
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/thirdparty/hive/CreateHiveTableEventSelectorTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/thirdparty/hive/HiveSinkEventSelectorTest.java
similarity index 81%
rename from inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/thirdparty/hive/CreateHiveTableEventSelectorTest.java
rename to inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/thirdparty/hive/HiveSinkEventSelectorTest.java
index 61a37d342..a9ab05be7 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/thirdparty/hive/CreateHiveTableEventSelectorTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/thirdparty/hive/HiveSinkEventSelectorTest.java
@@ -25,21 +25,21 @@ import org.junit.Assert;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
-public class CreateHiveTableEventSelectorTest extends ServiceBaseTest {
+public class HiveSinkEventSelectorTest extends ServiceBaseTest {
@Autowired
- CreateHiveTableEventSelector createHiveTableEventSelector;
+ HiveSinkEventSelector hiveSinkEventSelector;
@Test
public void testAccept() {
WorkflowContext workflowContext = new WorkflowContext();
GroupResourceProcessForm processForm = new GroupResourceProcessForm();
workflowContext.setProcessForm(processForm);
- Assert.assertFalse(createHiveTableEventSelector.accept(workflowContext));
+ Assert.assertFalse(hiveSinkEventSelector.accept(workflowContext));
processForm.setGroupInfo(new InlongGroupInfo());
- Assert.assertFalse(createHiveTableEventSelector.accept(workflowContext));
+ Assert.assertFalse(hiveSinkEventSelector.accept(workflowContext));
processForm.getGroupInfo().setInlongGroupId("test");
- Assert.assertTrue(createHiveTableEventSelector.accept(workflowContext));
+ Assert.assertTrue(hiveSinkEventSelector.accept(workflowContext));
}
}
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImplTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImplTest.java
index a2347eb29..8ede70759 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImplTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImplTest.java
@@ -44,7 +44,7 @@ import org.apache.inlong.manager.service.mocks.MockDeleteSortListener;
import org.apache.inlong.manager.service.mocks.MockPlugin;
import org.apache.inlong.manager.service.mocks.MockRestartSortListener;
import org.apache.inlong.manager.service.mocks.MockStopSortListener;
-import org.apache.inlong.manager.service.thirdparty.hive.CreateHiveTableListener;
+import org.apache.inlong.manager.service.thirdparty.hive.CreateHiveSinkListener;
import org.apache.inlong.manager.service.thirdparty.mq.CreatePulsarGroupTaskListener;
import org.apache.inlong.manager.service.thirdparty.mq.CreatePulsarResourceTaskListener;
import org.apache.inlong.manager.service.thirdparty.mq.CreateTubeGroupTaskListener;
@@ -188,7 +188,7 @@ public class WorkflowServiceImplTest extends ServiceBaseTest {
public void mockTaskListenerFactory() {
CreateTubeGroupTaskListener createTubeGroupTaskListener = mock(CreateTubeGroupTaskListener.class);
when(createTubeGroupTaskListener.listen(any(WorkflowContext.class))).thenReturn(ListenerResult.success());
- when(createTubeGroupTaskListener.name()).thenReturn(CreateHiveTableListener.class.getSimpleName());
+ when(createTubeGroupTaskListener.name()).thenReturn(CreateHiveSinkListener.class.getSimpleName());
when(createTubeGroupTaskListener.event()).thenReturn(TaskEvent.COMPLETE);
taskListenerFactory.setCreateTubeGroupTaskListener(createTubeGroupTaskListener);
@@ -212,11 +212,11 @@ public class WorkflowServiceImplTest extends ServiceBaseTest {
when(createPulsarGroupTaskListener.event()).thenReturn(TaskEvent.COMPLETE);
taskListenerFactory.setCreatePulsarGroupTaskListener(createPulsarGroupTaskListener);
- CreateHiveTableListener createHiveTableListener = mock(CreateHiveTableListener.class);
- when(createHiveTableListener.listen(any(WorkflowContext.class))).thenReturn(ListenerResult.success());
- when(createHiveTableListener.name()).thenReturn(CreateHiveTableListener.class.getSimpleName());
- when(createHiveTableListener.event()).thenReturn(TaskEvent.COMPLETE);
- taskListenerFactory.setCreateHiveTableListener(createHiveTableListener);
+ CreateHiveSinkListener createHiveSinkListener = mock(CreateHiveSinkListener.class);
+ when(createHiveSinkListener.listen(any(WorkflowContext.class))).thenReturn(ListenerResult.success());
+ when(createHiveSinkListener.name()).thenReturn(CreateHiveSinkListener.class.getSimpleName());
+ when(createHiveSinkListener.event()).thenReturn(TaskEvent.COMPLETE);
+ taskListenerFactory.setCreateHiveSinkListener(createHiveSinkListener);
PushSortConfigListener pushSortConfigListener = mock(PushSortConfigListener.class);
when(pushSortConfigListener.listen(any(WorkflowContext.class))).thenReturn(ListenerResult.success());