You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/04/11 07:13:01 UTC

[incubator-inlong] branch master updated: [INLONG-3601][Manager] Should not create Hive resource when sink type is Kafka (#3602)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 50ee36fc1 [INLONG-3601][Manager] Should not create Hive resource when sink type is Kafka (#3602)
50ee36fc1 is described below

commit 50ee36fc1b16963f29969844a127e4d5e38cb14f
Author: healchow <he...@gmail.com>
AuthorDate: Mon Apr 11 15:12:55 2022 +0800

    [INLONG-3601][Manager] Should not create Hive resource when sink type is Kafka (#3602)
---
 .../manager/common/pojo/source/SourceRequest.java  |  2 +-
 .../plugin/eventselect/DeleteProcessSelector.java  | 22 ++++++++++++----------
 .../plugin/eventselect/RestartProcessSelector.java | 18 ++++++++++--------
 .../plugin/eventselect/StartupProcessSelector.java | 11 ++++++-----
 .../plugin/eventselect/SuspendProcessSelector.java | 18 ++++++++++--------
 ...r.java => CreateHiveSinkForStreamListener.java} |  2 +-
 ...leListener.java => CreateHiveSinkListener.java} |  9 ++++-----
 ...entSelector.java => HiveSinkEventSelector.java} | 18 +++++++++++-------
 .../service/thirdparty/mq/PulsarEventSelector.java | 15 ++++++++++++---
 .../service/thirdparty/mq/TubeEventSelector.java   | 11 ++++++-----
 .../thirdparty/sort/ZkDisabledEventSelector.java   | 14 ++++++++++++--
 .../thirdparty/sort/ZkEnabledEventSelector.java    | 10 +++++++++-
 .../workflow/ServiceTaskListenerFactory.java       | 10 +++++-----
 .../stream/CreateStreamWorkflowDefinition.java     |  4 ++--
 ...torTest.java => HiveSinkEventSelectorTest.java} | 10 +++++-----
 .../service/workflow/WorkflowServiceImplTest.java  | 14 +++++++-------
 16 files changed, 113 insertions(+), 75 deletions(-)

diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceRequest.java
index 6249a266c..ef062faa1 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceRequest.java
@@ -69,7 +69,7 @@ public class SourceRequest {
     @ApiModelProperty("Snapshot of the source task")
     private String snapshot;
 
-    @ApiModelProperty("Data Serialization, support: json, canal, avro, etc")
+    @ApiModelProperty("Data Serialization, support: csv, json, canal, avro, etc")
     private String serializationType;
 
     @ApiModelProperty("Version")
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/DeleteProcessSelector.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/DeleteProcessSelector.java
index dc5f8626c..bcecc4c2d 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/DeleteProcessSelector.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/DeleteProcessSelector.java
@@ -26,23 +26,25 @@ import org.apache.inlong.manager.workflow.event.EventSelector;
 
 @Slf4j
 public class DeleteProcessSelector implements EventSelector {
+
     @SneakyThrows
     @Override
-    public boolean accept(WorkflowContext workflowContext) {
-        String inlongGroupId = workflowContext.getProcessForm().getInlongGroupId();
-        log.info("inlongGroupId:{} enter deleteProcess listener", inlongGroupId);
-        ProcessForm processForm = workflowContext.getProcessForm();
-        if (processForm == null || !(processForm instanceof UpdateGroupProcessForm)) {
-            log.info("inlongGroupId:{} not add deleteProcess listener", inlongGroupId);
+    public boolean accept(WorkflowContext context) {
+        ProcessForm processForm = context.getProcessForm();
+        String groupId = processForm.getInlongGroupId();
+        if (!(processForm instanceof UpdateGroupProcessForm)) {
+            log.info("not add deleteProcess listener as the form was not UpdateGroup for groupId [{}]", groupId);
             return false;
         }
-        UpdateGroupProcessForm updateGroupProcessForm = (UpdateGroupProcessForm) processForm;
-        boolean flag = updateGroupProcessForm.getOperateType() == UpdateGroupProcessForm.OperateType.DELETE;
+
+        UpdateGroupProcessForm updateProcessForm = (UpdateGroupProcessForm) processForm;
+        boolean flag = updateProcessForm.getOperateType() == UpdateGroupProcessForm.OperateType.DELETE;
         if (!flag) {
-            log.info("inlongGroupId:{} not add deleteProcess listener, not DELETE", inlongGroupId);
+            log.info("not add deleteProcess listener as the operate was not DELETE for groupId [{}]", groupId);
             return false;
         }
-        log.info("inlongGroupId:{} add deleteProcess listener", inlongGroupId);
+
+        log.info("add deleteProcess listener for groupId [{}]", groupId);
         return true;
     }
 }
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/RestartProcessSelector.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/RestartProcessSelector.java
index ef210f339..9d27bc7c6 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/RestartProcessSelector.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/RestartProcessSelector.java
@@ -26,23 +26,25 @@ import org.apache.inlong.manager.workflow.event.EventSelector;
 
 @Slf4j
 public class RestartProcessSelector implements EventSelector {
+
     @SneakyThrows
     @Override
     public boolean accept(WorkflowContext workflowContext) {
-        String inlongGroupId = workflowContext.getProcessForm().getInlongGroupId();
-        log.info("inlongGroupId:{} enter restartProcess listener", inlongGroupId);
         ProcessForm processForm = workflowContext.getProcessForm();
-        if (processForm == null || !(processForm instanceof UpdateGroupProcessForm)) {
-            log.info("inlongGroupId:{} not add restartProcess listener", inlongGroupId);
+        String groupId = processForm.getInlongGroupId();
+        if (!(processForm instanceof UpdateGroupProcessForm)) {
+            log.info("not add restartProcess listener as the form was not UpdateGroup for groupId [{}]", groupId);
             return false;
         }
-        UpdateGroupProcessForm updateGroupProcessForm = (UpdateGroupProcessForm) processForm;
-        boolean flag = updateGroupProcessForm.getOperateType() == UpdateGroupProcessForm.OperateType.RESTART;
+
+        UpdateGroupProcessForm updateProcessForm = (UpdateGroupProcessForm) processForm;
+        boolean flag = updateProcessForm.getOperateType() == UpdateGroupProcessForm.OperateType.RESTART;
         if (!flag) {
-            log.info("inlongGroupId:{} not add restartProcess listener, not RESTART", inlongGroupId);
+            log.info("not add restartProcess listener as the operate was not RESTART for groupId [{}]", groupId);
             return false;
         }
-        log.info("inlongGroupId:{} add restartProcess listener", inlongGroupId);
+
+        log.info("add restartProcess listener for groupId [{}]", groupId);
         return true;
     }
 }
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/StartupProcessSelector.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/StartupProcessSelector.java
index de0ab0914..eaa44f906 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/StartupProcessSelector.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/StartupProcessSelector.java
@@ -26,17 +26,18 @@ import org.apache.inlong.manager.workflow.event.EventSelector;
 
 @Slf4j
 public class StartupProcessSelector implements EventSelector {
+
     @SneakyThrows
     @Override
     public boolean accept(WorkflowContext workflowContext) {
-        String inlongGroupId = workflowContext.getProcessForm().getInlongGroupId();
-        log.info("inlongGroupId:{} enter startupProcess listener", inlongGroupId);
         ProcessForm processForm = workflowContext.getProcessForm();
-        if (processForm == null || !(processForm instanceof GroupResourceProcessForm)) {
-            log.info("inlongGroupId:{} not add startupProcess listener", inlongGroupId);
+        String groupId = processForm.getInlongGroupId();
+        if (!(processForm instanceof GroupResourceProcessForm)) {
+            log.info("not add startupProcess listener as the form was not GroupResource for groupId [{}]", groupId);
             return false;
         }
-        log.info("inlongGroupId:{} add startupProcess listener", inlongGroupId);
+
+        log.info("add startupProcess listener for groupId [{}]", groupId);
         return true;
     }
 }
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/SuspendProcessSelector.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/SuspendProcessSelector.java
index 16c0f91da..5df1cff53 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/SuspendProcessSelector.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/SuspendProcessSelector.java
@@ -26,23 +26,25 @@ import org.apache.inlong.manager.workflow.event.EventSelector;
 
 @Slf4j
 public class SuspendProcessSelector implements EventSelector {
+
     @SneakyThrows
     @Override
     public boolean accept(WorkflowContext workflowContext) {
-        String inlongGroupId = workflowContext.getProcessForm().getInlongGroupId();
-        log.info("inlongGroupId:{} enter suspendProcess listener", inlongGroupId);
         ProcessForm processForm = workflowContext.getProcessForm();
-        if (processForm == null || !(processForm instanceof UpdateGroupProcessForm)) {
-            log.info("inlongGroupId:{} not add suspendProcess listener", inlongGroupId);
+        String groupId = processForm.getInlongGroupId();
+        if (!(processForm instanceof UpdateGroupProcessForm)) {
+            log.info("not add suspendProcess listener as the form was not UpdateGroup for groupId [{}]", groupId);
             return false;
         }
-        UpdateGroupProcessForm updateGroupProcessForm = (UpdateGroupProcessForm) processForm;
-        boolean flag = updateGroupProcessForm.getOperateType() == UpdateGroupProcessForm.OperateType.SUSPEND;
+
+        UpdateGroupProcessForm updateProcessForm = (UpdateGroupProcessForm) processForm;
+        boolean flag = updateProcessForm.getOperateType() == UpdateGroupProcessForm.OperateType.SUSPEND;
         if (!flag) {
-            log.info("inlongGroupId:{} not add suspendProcess listener, not SUSPEND", inlongGroupId);
+            log.info("not add suspendProcess listener as the operate was not SUSPEND for groupId [{}]", groupId);
             return false;
         }
-        log.info("inlongGroupId:{} add suspendProcess listener", inlongGroupId);
+
+        log.info("add suspendProcess listener for groupId [{}]", groupId);
         return true;
     }
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/hive/CreateHiveTableForStreamListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/hive/CreateHiveSinkForStreamListener.java
similarity index 97%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/hive/CreateHiveTableForStreamListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/hive/CreateHiveSinkForStreamListener.java
index 38d59b73d..b4d16b823 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/hive/CreateHiveTableForStreamListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/hive/CreateHiveSinkForStreamListener.java
@@ -37,7 +37,7 @@ import lombok.extern.slf4j.Slf4j;
  */
 @Service
 @Slf4j
-public class CreateHiveTableForStreamListener implements SinkOperateListener {
+public class CreateHiveSinkForStreamListener implements SinkOperateListener {
 
     @Autowired
     private StreamSinkEntityMapper sinkMapper;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/hive/CreateHiveTableListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/hive/CreateHiveSinkListener.java
similarity index 97%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/hive/CreateHiveTableListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/hive/CreateHiveSinkListener.java
index 984161bba..9be1bc60c 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/hive/CreateHiveTableListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/hive/CreateHiveSinkListener.java
@@ -17,9 +17,7 @@
 
 package org.apache.inlong.manager.service.thirdparty.hive;
 
-import java.util.List;
-import java.util.stream.Collectors;
-
+import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.common.pojo.sink.SinkForSortDTO;
 import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
 import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
@@ -31,14 +29,15 @@ import org.apache.inlong.manager.workflow.event.task.TaskEvent;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
-import lombok.extern.slf4j.Slf4j;
+import java.util.List;
+import java.util.stream.Collectors;
 
 /**
  * Event listener of create hive table for all inlong stream
  */
 @Service
 @Slf4j
-public class CreateHiveTableListener implements SinkOperateListener {
+public class CreateHiveSinkListener implements SinkOperateListener {
 
     @Autowired
     private StreamSinkEntityMapper sinkMapper;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/hive/CreateHiveTableEventSelector.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/hive/HiveSinkEventSelector.java
similarity index 82%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/hive/CreateHiveTableEventSelector.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/hive/HiveSinkEventSelector.java
index 214d5f0f2..1e962e1e2 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/hive/CreateHiveTableEventSelector.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/hive/HiveSinkEventSelector.java
@@ -36,7 +36,7 @@ import java.util.stream.Collectors;
 
 @Component
 @Slf4j
-public class CreateHiveTableEventSelector implements EventSelector {
+public class HiveSinkEventSelector implements EventSelector {
 
     @Autowired
     private StreamSinkService sinkService;
@@ -49,23 +49,27 @@ public class CreateHiveTableEventSelector implements EventSelector {
         if (!(processForm instanceof GroupResourceProcessForm)) {
             return false;
         }
+
         GroupResourceProcessForm form = (GroupResourceProcessForm) processForm;
+        String groupId = form.getInlongGroupId();
         if (form.getGroupInfo() == null || StringUtils.isEmpty(form.getGroupInfo().getInlongGroupId())) {
+            log.info("not add create hive table listener as the info was null for groupId [{}]", groupId);
             return false;
         }
-        String groupId = form.getInlongGroupId();
-        List<String> dsForHive = sinkService.getExistsStreamIdList(groupId, SinkType.SINK_HIVE,
+        List<String> streamWithHiveSink = sinkService.getExistsStreamIdList(groupId, SinkType.SINK_HIVE,
                 streamMapper.selectByGroupId(groupId)
                         .stream()
                         .map(InlongStreamEntity::getInlongStreamId)
                         .collect(Collectors.toList()));
 
-        if (CollectionUtils.isEmpty(dsForHive)) {
-            log.warn("groupId={} streamId={} does not have sink, skip to create hive table ",
+        if (CollectionUtils.isEmpty(streamWithHiveSink)) {
+            log.warn("skip to create hive table as no hive sink found for groupId={} streamId={}",
                     groupId, form.getInlongStreamId());
-            return true;
+            return false;
         }
-        return false;
+
+        log.info("add create hive table listener for groupId [{}]", groupId);
+        return true;
     }
 
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/PulsarEventSelector.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/PulsarEventSelector.java
index 5d4289b05..c33301a67 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/PulsarEventSelector.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/PulsarEventSelector.java
@@ -34,14 +34,23 @@ public class PulsarEventSelector implements EventSelector {
         if (!(processForm instanceof GroupResourceProcessForm)) {
             return false;
         }
+
         GroupResourceProcessForm form = (GroupResourceProcessForm) processForm;
+        String groupId = form.getInlongGroupId();
         MQType mqType = MQType.forType(form.getGroupInfo().getMiddlewareType());
         if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
             InlongGroupPulsarInfo pulsarInfo = (InlongGroupPulsarInfo) form.getGroupInfo().getMqExtInfo();
-            return pulsarInfo.getEnableCreateResource() == 1;
+            boolean enable = pulsarInfo.getEnableCreateResource() == 1;
+            if (enable) {
+                log.info("need to create pulsar resource as the createResource was true for groupId [{}]", groupId);
+                return true;
+            } else {
+                log.info("skip to create pulsar resource as the createResource was false for groupId [{}]", groupId);
+                return false;
+            }
         }
-        log.warn("no need to create pulsar subscription group for groupId={}, as the middlewareType={}",
-                form.getInlongGroupId(), mqType);
+
+        log.warn("skip to create pulsar subscription as the mq type is {} for groupId [{}]", mqType, groupId);
         return false;
     }
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/TubeEventSelector.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/TubeEventSelector.java
index 3b5e71383..0f35fffb5 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/TubeEventSelector.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/TubeEventSelector.java
@@ -19,7 +19,6 @@ package org.apache.inlong.manager.service.thirdparty.mq;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.common.enums.MQType;
-import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
 import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
 import org.apache.inlong.manager.workflow.WorkflowContext;
@@ -35,12 +34,14 @@ public class TubeEventSelector implements EventSelector {
             return false;
         }
         GroupResourceProcessForm form = (GroupResourceProcessForm) processForm;
-        InlongGroupInfo groupInfo = form.getGroupInfo();
-        if (MQType.forType(groupInfo.getMiddlewareType()) == MQType.TUBE) {
+        String groupId = form.getInlongGroupId();
+        MQType mqType = MQType.forType(form.getGroupInfo().getMiddlewareType());
+        if (mqType == MQType.TUBE) {
+            log.info("need to create tube resource for groupId [{}]", groupId);
             return true;
         }
-        log.warn("not need to create tube resource for groupId={}, as the middleware type is {}",
-                groupInfo.getMiddlewareType(), form.getInlongGroupId());
+
+        log.warn("skip to to create tube resource as the mq type is {} for groupId [{}]", mqType, groupId);
         return false;
     }
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/ZkDisabledEventSelector.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/ZkDisabledEventSelector.java
index 9686c8db4..ffc376ede 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/ZkDisabledEventSelector.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/ZkDisabledEventSelector.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.manager.service.thirdparty.sort;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.common.enums.MQType;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
@@ -29,23 +30,32 @@ import org.springframework.stereotype.Component;
 /**
  * Event selector for whether ZooKeeper is disabled.
  */
+@Slf4j
 @Component
 public class ZkDisabledEventSelector implements EventSelector {
 
     @Override
     public boolean accept(WorkflowContext context) {
         ProcessForm processForm = context.getProcessForm();
+        String groupId = processForm.getInlongGroupId();
         if (processForm instanceof GroupResourceProcessForm) {
             GroupResourceProcessForm groupResourceForm = (GroupResourceProcessForm) processForm;
             InlongGroupInfo groupInfo = groupResourceForm.getGroupInfo();
-            return groupInfo.getZookeeperEnabled() == 0
+
+            boolean enable = groupInfo.getZookeeperEnabled() == 0
                     && MQType.forType(groupInfo.getMiddlewareType()) != MQType.NONE;
+            log.info("zookeeper disabled was [{}] for groupId [{}]", enable, groupId);
+            return enable;
         } else if (processForm instanceof UpdateGroupProcessForm) {
             UpdateGroupProcessForm updateGroupProcessForm = (UpdateGroupProcessForm) processForm;
             InlongGroupInfo groupInfo = updateGroupProcessForm.getGroupInfo();
-            return groupInfo.getZookeeperEnabled() == 0
+
+            boolean enable = groupInfo.getZookeeperEnabled() == 0
                     && MQType.forType(groupInfo.getMiddlewareType()) != MQType.NONE;
+            log.info("zookeeper disabled was [{}] for groupId [{}]", enable, groupId);
+            return enable;
         } else {
+            log.info("zk disabled for groupId [{}]", groupId);
             return false;
         }
     }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/ZkEnabledEventSelector.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/ZkEnabledEventSelector.java
index 05e2b8062..057dfb83e 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/ZkEnabledEventSelector.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/ZkEnabledEventSelector.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.manager.service.thirdparty.sort;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.common.enums.MQType;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
@@ -28,18 +29,25 @@ import org.springframework.stereotype.Component;
 /**
  * Event selector for whether ZooKeeper is enabled.
  */
+@Slf4j
 @Component
 public class ZkEnabledEventSelector implements EventSelector {
 
     @Override
     public boolean accept(WorkflowContext context) {
         ProcessForm processForm = context.getProcessForm();
+        String groupId = processForm.getInlongGroupId();
         if (!(processForm instanceof GroupResourceProcessForm)) {
+            log.info("zookeeper enabled was [false] for groupId [{}]", groupId);
             return false;
         }
+
         GroupResourceProcessForm groupResourceForm = (GroupResourceProcessForm) processForm;
         InlongGroupInfo groupInfo = groupResourceForm.getGroupInfo();
-        return groupInfo.getZookeeperEnabled() == 1 && MQType.forType(groupInfo.getMiddlewareType()) != MQType.NONE;
+        boolean enable =
+                groupInfo.getZookeeperEnabled() == 1 && MQType.forType(groupInfo.getMiddlewareType()) != MQType.NONE;
+        log.info("zookeeper enabled was [{}] for groupId [{}]", enable, groupId);
+        return enable;
     }
 
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/ServiceTaskListenerFactory.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/ServiceTaskListenerFactory.java
index 1c1d308a7..fe9ca4b70 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/ServiceTaskListenerFactory.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/ServiceTaskListenerFactory.java
@@ -26,8 +26,8 @@ import org.apache.inlong.manager.service.source.listener.SourceRestartEventSelec
 import org.apache.inlong.manager.service.source.listener.SourceRestartListener;
 import org.apache.inlong.manager.service.source.listener.SourceStopEventSelector;
 import org.apache.inlong.manager.service.source.listener.SourceStopListener;
-import org.apache.inlong.manager.service.thirdparty.hive.CreateHiveTableEventSelector;
-import org.apache.inlong.manager.service.thirdparty.hive.CreateHiveTableListener;
+import org.apache.inlong.manager.service.thirdparty.hive.HiveSinkEventSelector;
+import org.apache.inlong.manager.service.thirdparty.hive.CreateHiveSinkListener;
 import org.apache.inlong.manager.service.thirdparty.mq.CreatePulsarGroupTaskListener;
 import org.apache.inlong.manager.service.thirdparty.mq.CreatePulsarResourceTaskListener;
 import org.apache.inlong.manager.service.thirdparty.mq.CreateTubeGroupTaskListener;
@@ -97,9 +97,9 @@ public class ServiceTaskListenerFactory implements PluginBinder, ServiceTaskList
 
     @Autowired
     @Setter
-    private CreateHiveTableListener createHiveTableListener;
+    private CreateHiveSinkListener createHiveSinkListener;
     @Autowired
-    private CreateHiveTableEventSelector createHiveTableEventSelector;
+    private HiveSinkEventSelector hiveSinkEventSelector;
 
     @Autowired
     @Setter
@@ -119,7 +119,7 @@ public class ServiceTaskListenerFactory implements PluginBinder, ServiceTaskList
         sourceOperateListeners.put(sourceDeleteListener, new SourceDeleteEventSelector());
         sourceOperateListeners.put(sourceRestartListener, new SourceRestartEventSelector());
         sinkOperateListeners = new LinkedHashMap<>();
-        sinkOperateListeners.put(createHiveTableListener, createHiveTableEventSelector);
+        sinkOperateListeners.put(createHiveSinkListener, hiveSinkEventSelector);
         queueOperateListeners = new LinkedHashMap<>();
         queueOperateListeners.put(createTubeTopicTaskListener, new TubeEventSelector());
         queueOperateListeners.put(createTubeGroupTaskListener, new TubeEventSelector());
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java
index 373f0eea7..5cb5b1888 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java
@@ -23,7 +23,7 @@ import org.apache.inlong.manager.common.enums.MQType;
 import org.apache.inlong.manager.common.enums.SinkType;
 import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
 import org.apache.inlong.manager.service.sink.StreamSinkService;
-import org.apache.inlong.manager.service.thirdparty.hive.CreateHiveTableForStreamListener;
+import org.apache.inlong.manager.service.thirdparty.hive.CreateHiveSinkForStreamListener;
 import org.apache.inlong.manager.service.thirdparty.mq.CreatePulsarGroupForStreamTaskListener;
 import org.apache.inlong.manager.service.thirdparty.mq.CreatePulsarTopicForStreamTaskListener;
 import org.apache.inlong.manager.service.thirdparty.sort.PushSortConfigListener;
@@ -56,7 +56,7 @@ public class CreateStreamWorkflowDefinition implements WorkflowDefinition {
     @Autowired
     private StreamCompleteProcessListener streamCompleteProcessListener;
     @Autowired
-    private CreateHiveTableForStreamListener createHiveTableListener;
+    private CreateHiveSinkForStreamListener createHiveTableListener;
     @Autowired
     private PushSortConfigListener pushSortConfigListener;
     @Autowired
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/thirdparty/hive/CreateHiveTableEventSelectorTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/thirdparty/hive/HiveSinkEventSelectorTest.java
similarity index 81%
rename from inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/thirdparty/hive/CreateHiveTableEventSelectorTest.java
rename to inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/thirdparty/hive/HiveSinkEventSelectorTest.java
index 61a37d342..a9ab05be7 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/thirdparty/hive/CreateHiveTableEventSelectorTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/thirdparty/hive/HiveSinkEventSelectorTest.java
@@ -25,21 +25,21 @@ import org.junit.Assert;
 import org.junit.Test;
 import org.springframework.beans.factory.annotation.Autowired;
 
-public class CreateHiveTableEventSelectorTest extends ServiceBaseTest {
+public class HiveSinkEventSelectorTest extends ServiceBaseTest {
 
     @Autowired
-    CreateHiveTableEventSelector createHiveTableEventSelector;
+    HiveSinkEventSelector hiveSinkEventSelector;
 
     @Test
     public void testAccept() {
         WorkflowContext workflowContext = new WorkflowContext();
         GroupResourceProcessForm processForm = new GroupResourceProcessForm();
         workflowContext.setProcessForm(processForm);
-        Assert.assertFalse(createHiveTableEventSelector.accept(workflowContext));
+        Assert.assertFalse(hiveSinkEventSelector.accept(workflowContext));
         processForm.setGroupInfo(new InlongGroupInfo());
-        Assert.assertFalse(createHiveTableEventSelector.accept(workflowContext));
+        Assert.assertFalse(hiveSinkEventSelector.accept(workflowContext));
         processForm.getGroupInfo().setInlongGroupId("test");
-        Assert.assertTrue(createHiveTableEventSelector.accept(workflowContext));
+        Assert.assertTrue(hiveSinkEventSelector.accept(workflowContext));
     }
 
 }
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImplTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImplTest.java
index a2347eb29..8ede70759 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImplTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImplTest.java
@@ -44,7 +44,7 @@ import org.apache.inlong.manager.service.mocks.MockDeleteSortListener;
 import org.apache.inlong.manager.service.mocks.MockPlugin;
 import org.apache.inlong.manager.service.mocks.MockRestartSortListener;
 import org.apache.inlong.manager.service.mocks.MockStopSortListener;
-import org.apache.inlong.manager.service.thirdparty.hive.CreateHiveTableListener;
+import org.apache.inlong.manager.service.thirdparty.hive.CreateHiveSinkListener;
 import org.apache.inlong.manager.service.thirdparty.mq.CreatePulsarGroupTaskListener;
 import org.apache.inlong.manager.service.thirdparty.mq.CreatePulsarResourceTaskListener;
 import org.apache.inlong.manager.service.thirdparty.mq.CreateTubeGroupTaskListener;
@@ -188,7 +188,7 @@ public class WorkflowServiceImplTest extends ServiceBaseTest {
     public void mockTaskListenerFactory() {
         CreateTubeGroupTaskListener createTubeGroupTaskListener = mock(CreateTubeGroupTaskListener.class);
         when(createTubeGroupTaskListener.listen(any(WorkflowContext.class))).thenReturn(ListenerResult.success());
-        when(createTubeGroupTaskListener.name()).thenReturn(CreateHiveTableListener.class.getSimpleName());
+        when(createTubeGroupTaskListener.name()).thenReturn(CreateHiveSinkListener.class.getSimpleName());
         when(createTubeGroupTaskListener.event()).thenReturn(TaskEvent.COMPLETE);
         taskListenerFactory.setCreateTubeGroupTaskListener(createTubeGroupTaskListener);
 
@@ -212,11 +212,11 @@ public class WorkflowServiceImplTest extends ServiceBaseTest {
         when(createPulsarGroupTaskListener.event()).thenReturn(TaskEvent.COMPLETE);
         taskListenerFactory.setCreatePulsarGroupTaskListener(createPulsarGroupTaskListener);
 
-        CreateHiveTableListener createHiveTableListener = mock(CreateHiveTableListener.class);
-        when(createHiveTableListener.listen(any(WorkflowContext.class))).thenReturn(ListenerResult.success());
-        when(createHiveTableListener.name()).thenReturn(CreateHiveTableListener.class.getSimpleName());
-        when(createHiveTableListener.event()).thenReturn(TaskEvent.COMPLETE);
-        taskListenerFactory.setCreateHiveTableListener(createHiveTableListener);
+        CreateHiveSinkListener createHiveSinkListener = mock(CreateHiveSinkListener.class);
+        when(createHiveSinkListener.listen(any(WorkflowContext.class))).thenReturn(ListenerResult.success());
+        when(createHiveSinkListener.name()).thenReturn(CreateHiveSinkListener.class.getSimpleName());
+        when(createHiveSinkListener.event()).thenReturn(TaskEvent.COMPLETE);
+        taskListenerFactory.setCreateHiveSinkListener(createHiveSinkListener);
 
         PushSortConfigListener pushSortConfigListener = mock(PushSortConfigListener.class);
         when(pushSortConfigListener.listen(any(WorkflowContext.class))).thenReturn(ListenerResult.success());