You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/01/26 12:51:28 UTC

[incubator-inlong] branch master updated: [INLONG-2317] Support Plugin in Inlong Manager (#2319)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 6bc4b37   [INLONG-2317] Support Plugin in Inlong Manager (#2319)
6bc4b37 is described below

commit 6bc4b379611ba0af7fefd19ee137f44aa6f2b47d
Author: kipshi <48...@users.noreply.github.com>
AuthorDate: Wed Jan 26 20:51:23 2022 +0800

     [INLONG-2317] Support Plugin in Inlong Manager (#2319)
---
 .../manager/common/event/EventSelector.java}       |  18 +--
 .../event/task/DataSourceOperateListener.java}     |  33 ++--
 .../common/event/task/QueueOperateListener.java}   |  33 ++--
 .../common/event/task/SortOperateListener.java}    |  32 ++--
 .../common/event/task/StorageOperateListener.java} |  32 ++--
 .../inlong/manager/common/plugin/Plugin.java}      |  13 +-
 .../manager/common/plugin/PluginBinder.java}       |  13 +-
 .../manager/common/plugin/PluginDefinition.java}   |  37 +++--
 .../manager/common/plugin/ProcessPlugin.java       |  45 ++++++
 ...fgiEntity.java => SortClusterConfigEntity.java} |   2 +-
 .../dao/mapper/SortClusterConfgiEntityMapper.java  |  14 +-
 .../mappers/SortClusterConfgiEntityMapper.xml      |  10 +-
 inlong-manager/manager-plugin-examples/pom.xml     |  56 +++++++
 .../inlong/manager/plugin/EmptyProcessPlugin.java  |  50 ++++++
 .../src/main/resources/META-INF/plugin.yaml        |  23 +++
 inlong-manager/manager-service/pom.xml             |   4 +
 .../service/core/SortClusterConfigService.java     |   4 +-
 .../core/impl/SortClusterConfigServiceImpl.java    |   4 +-
 .../manager/service/core/impl/SortServiceImpl.java |   8 +-
 .../manager/service/core/plugin/JarHell.java       | 127 +++++++++++++++
 .../service/core/plugin/PluginClassLoader.java     | 177 +++++++++++++++++++++
 .../manager/service/core/plugin/PluginService.java |  94 +++++++++++
 .../hive/CreateHiveTableEventSelector.java         |  69 ++++++++
 .../hive/CreateHiveTableForStreamListener.java     |   4 +-
 .../thirdpart/hive/CreateHiveTableListener.java    |   4 +-
 .../mq/CreatePulsarGroupForStreamTaskListener.java |  12 +-
 .../mq/CreatePulsarGroupTaskListener.java          |  12 +-
 .../mq/CreatePulsarResourceTaskListener.java       |  12 +-
 .../mq/CreatePulsarTopicForStreamTaskListener.java |  12 +-
 .../thirdpart/mq/CreateTubeGroupTaskListener.java  |  12 +-
 .../thirdpart/mq/CreateTubeTopicTaskListener.java  |  12 +-
 .../service/thirdpart/mq/PulsarEventSelector.java  |  40 +++++
 .../service/thirdpart/mq/TubeEventSelector.java    |  41 +++++
 .../thirdpart/sort/PushHiveConfigTaskListener.java |  12 +-
 .../thirdpart/sort/ZkSortEventSelector.java}       |  23 +--
 .../service/workflow/TaskEventListenerFactory.java | 163 +++++++++++++++++++
 .../manager/service/{core => }/BaseConfig.java     |   2 +-
 .../manager/service/{core => }/BaseTest.java       |   3 +-
 .../{BaseConfig.java => plugin/JarHellTest.java}   |  24 +--
 .../service/core/plugin/PluginClassLoaderTest.java |  58 +++++++
 .../plugin/PluginServiceTest.java}                 |  25 +--
 .../hive/CreateHiveTableEventSelectorTest.java     |  45 ++++++
 .../workflow/TaskEventListenerFactoryTest.java     |  61 +++++++
 .../CreateBusinessWorkflowDefinitionTest.java      |   4 +-
 .../src/test/resources/application-test.properties |   8 +-
 .../resources/plugins/inlong-manager-plugin.jar    | Bin 0 -> 12277 bytes
 .../web/controller/openapi/SortControllerTest.java |  14 +-
 inlong-manager/pom.xml                             |   6 +-
 48 files changed, 1302 insertions(+), 205 deletions(-)

diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/BaseConfig.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/event/EventSelector.java
similarity index 69%
copy from inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/BaseConfig.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/event/EventSelector.java
index 3a480eb..a8f66a9 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/BaseConfig.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/event/EventSelector.java
@@ -15,17 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.service.core;
+package org.apache.inlong.manager.common.event;
 
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.web.client.RestTemplate;
+import org.apache.inlong.manager.common.model.WorkflowContext;
 
-@Configuration
-public class BaseConfig {
-    @Bean
-    public RestTemplate restTemplate() {
-        return new RestTemplate();
-    }
+/**
+ * a selector allowing to decide which event is selected
+ */
+public interface EventSelector {
+
+    boolean accept(WorkflowContext context);
 
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortClusterConfigService.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/event/task/DataSourceOperateListener.java
similarity index 53%
copy from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortClusterConfigService.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/event/task/DataSourceOperateListener.java
index 75d75ac..768453c 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortClusterConfigService.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/event/task/DataSourceOperateListener.java
@@ -15,21 +15,28 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.service.core;
+package org.apache.inlong.manager.common.event.task;
 
-import org.apache.inlong.manager.dao.entity.SortClusterConfgiEntity;
+import org.apache.inlong.manager.common.event.ListenerResult;
+import org.apache.inlong.manager.common.model.WorkflowContext;
 
-import java.util.List;
+public interface DataSourceOperateListener extends TaskEventListener {
 
-/**
- * Sort cluster config service.
- */
-public interface SortClusterConfigService {
+    DataSourceOperateListener DEFAULT_SOURCE_OPERATE_LISTENER = new DataSourceOperateListener() {
+        @Override
+        public TaskEvent event() {
+            return TaskEvent.COMPLETE;
+        }
+
+        @Override
+        public ListenerResult listen(WorkflowContext context) throws Exception {
+            return ListenerResult.success();
+        }
+
+        @Override
+        public boolean async() {
+            return false;
+        }
+    };
 
-    /**
-     * Select list of task by cluster name.
-     * @param clusterName Name of sort cluster.
-     * @return List of tasks, including task name and sink type.
-     */
-    List<SortClusterConfgiEntity> selectTasksByClusterName(String clusterName);
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortClusterConfigService.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/event/task/QueueOperateListener.java
similarity index 54%
copy from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortClusterConfigService.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/event/task/QueueOperateListener.java
index 75d75ac..47b46c9 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortClusterConfigService.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/event/task/QueueOperateListener.java
@@ -15,21 +15,28 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.service.core;
+package org.apache.inlong.manager.common.event.task;
 
-import org.apache.inlong.manager.dao.entity.SortClusterConfgiEntity;
+import org.apache.inlong.manager.common.event.ListenerResult;
+import org.apache.inlong.manager.common.model.WorkflowContext;
 
-import java.util.List;
+public interface QueueOperateListener extends TaskEventListener {
 
-/**
- * Sort cluster config service.
- */
-public interface SortClusterConfigService {
+    QueueOperateListener DEFAULT_QUEUE_OPERATE_LISTENER = new QueueOperateListener() {
+        @Override
+        public TaskEvent event() {
+            return TaskEvent.COMPLETE;
+        }
+
+        @Override
+        public ListenerResult listen(WorkflowContext context) throws Exception {
+            return ListenerResult.success();
+        }
+
+        @Override
+        public boolean async() {
+            return false;
+        }
+    };
 
-    /**
-     * Select list of task by cluster name.
-     * @param clusterName Name of sort cluster.
-     * @return List of tasks, including task name and sink type.
-     */
-    List<SortClusterConfgiEntity> selectTasksByClusterName(String clusterName);
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortClusterConfigService.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/event/task/SortOperateListener.java
similarity index 54%
copy from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortClusterConfigService.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/event/task/SortOperateListener.java
index 75d75ac..78d3ce4 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortClusterConfigService.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/event/task/SortOperateListener.java
@@ -15,21 +15,27 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.service.core;
+package org.apache.inlong.manager.common.event.task;
 
-import org.apache.inlong.manager.dao.entity.SortClusterConfgiEntity;
+import org.apache.inlong.manager.common.event.ListenerResult;
+import org.apache.inlong.manager.common.model.WorkflowContext;
 
-import java.util.List;
+public interface SortOperateListener extends TaskEventListener {
 
-/**
- * Sort cluster config service.
- */
-public interface SortClusterConfigService {
+    SortOperateListener DEFAULT_SORT_OPERATE_LISTENER = new SortOperateListener() {
+        @Override
+        public TaskEvent event() {
+            return TaskEvent.COMPLETE;
+        }
+
+        @Override
+        public ListenerResult listen(WorkflowContext context) throws Exception {
+            return ListenerResult.success();
+        }
 
-    /**
-     * Select list of task by cluster name.
-     * @param clusterName Name of sort cluster.
-     * @return List of tasks, including task name and sink type.
-     */
-    List<SortClusterConfgiEntity> selectTasksByClusterName(String clusterName);
+        @Override
+        public boolean async() {
+            return false;
+        }
+    };
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortClusterConfigService.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/event/task/StorageOperateListener.java
similarity index 53%
copy from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortClusterConfigService.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/event/task/StorageOperateListener.java
index 75d75ac..45a9eb6 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortClusterConfigService.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/event/task/StorageOperateListener.java
@@ -15,21 +15,27 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.service.core;
+package org.apache.inlong.manager.common.event.task;
 
-import org.apache.inlong.manager.dao.entity.SortClusterConfgiEntity;
+import org.apache.inlong.manager.common.event.ListenerResult;
+import org.apache.inlong.manager.common.model.WorkflowContext;
 
-import java.util.List;
+public interface StorageOperateListener extends TaskEventListener {
 
-/**
- * Sort cluster config service.
- */
-public interface SortClusterConfigService {
+    StorageOperateListener DEFAULT_STORAGE_OPERATE_LISTENER = new StorageOperateListener() {
+        @Override
+        public TaskEvent event() {
+            return TaskEvent.COMPLETE;
+        }
+
+        @Override
+        public ListenerResult listen(WorkflowContext context) throws Exception {
+            return ListenerResult.success();
+        }
 
-    /**
-     * Select list of task by cluster name.
-     * @param clusterName Name of sort cluster.
-     * @return List of tasks, including task name and sink type.
-     */
-    List<SortClusterConfgiEntity> selectTasksByClusterName(String clusterName);
+        @Override
+        public boolean async() {
+            return false;
+        }
+    };
 }
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/BaseConfig.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/plugin/Plugin.java
similarity index 69%
copy from inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/BaseConfig.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/plugin/Plugin.java
index 3a480eb..a072c77 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/BaseConfig.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/plugin/Plugin.java
@@ -15,17 +15,8 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.service.core;
+package org.apache.inlong.manager.common.plugin;
 
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.web.client.RestTemplate;
-
-@Configuration
-public class BaseConfig {
-    @Bean
-    public RestTemplate restTemplate() {
-        return new RestTemplate();
-    }
+public interface Plugin {
 
 }
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/BaseConfig.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/plugin/PluginBinder.java
similarity index 69%
copy from inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/BaseConfig.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/plugin/PluginBinder.java
index 3a480eb..2fff722 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/BaseConfig.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/plugin/PluginBinder.java
@@ -15,17 +15,10 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.service.core;
+package org.apache.inlong.manager.common.plugin;
 
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.web.client.RestTemplate;
+public interface PluginBinder {
 
-@Configuration
-public class BaseConfig {
-    @Bean
-    public RestTemplate restTemplate() {
-        return new RestTemplate();
-    }
+    void acceptPlugin(Plugin plugin);
 
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortClusterConfigService.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/plugin/PluginDefinition.java
similarity index 54%
copy from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortClusterConfigService.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/plugin/PluginDefinition.java
index 75d75ac..874db28 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortClusterConfigService.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/plugin/PluginDefinition.java
@@ -15,21 +15,38 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.service.core;
+package org.apache.inlong.manager.common.plugin;
 
-import org.apache.inlong.manager.dao.entity.SortClusterConfgiEntity;
-
-import java.util.List;
+import lombok.Data;
 
 /**
- * Sort cluster config service.
+ * pluginDefinition should be defined in *.jar/META-INF/plugin.yaml
+ * for example:
+ *    name: test
+ *    description: this plugin is use for test
+ *    pluginClass: org.apache.inlong.plugin.testPlugin
+ *    javaVersion: 1.8 or 8
  */
-public interface SortClusterConfigService {
+@Data
+public class PluginDefinition {
+
+    /**
+     * name of plugin
+     */
+    private String name;
+
+    /**
+     * description of plugin to be used for user help
+     */
+    private String description;
+
+    /**
+     * java_version of plugin to be used for check validate
+     */
+    private String javaVersion;
 
     /**
-     * Select list of task by cluster name.
-     * @param clusterName Name of sort cluster.
-     * @return List of tasks, including task name and sink type.
+     * the full class name of plugin
      */
-    List<SortClusterConfgiEntity> selectTasksByClusterName(String clusterName);
+    private String pluginClass;
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/plugin/ProcessPlugin.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/plugin/ProcessPlugin.java
new file mode 100644
index 0000000..9192a5f
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/plugin/ProcessPlugin.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.plugin;
+
+import java.util.Map;
+import org.apache.inlong.manager.common.event.EventSelector;
+import org.apache.inlong.manager.common.event.task.DataSourceOperateListener;
+import org.apache.inlong.manager.common.event.task.QueueOperateListener;
+import org.apache.inlong.manager.common.event.task.SortOperateListener;
+import org.apache.inlong.manager.common.event.task.StorageOperateListener;
+
+public interface ProcessPlugin extends Plugin {
+
+    default Map<DataSourceOperateListener, EventSelector> createSourceOperateListeners() {
+        return null;
+    }
+
+    default Map<StorageOperateListener, EventSelector> createStorageOperateListeners() {
+        return null;
+    }
+
+    default Map<QueueOperateListener, EventSelector> createQueueOperateListeners() {
+        return null;
+    }
+
+    default Map<SortOperateListener, EventSelector> createSortOperateListeners() {
+        return null;
+    }
+
+}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/SortClusterConfgiEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/SortClusterConfigEntity.java
similarity index 94%
rename from inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/SortClusterConfgiEntity.java
rename to inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/SortClusterConfigEntity.java
index ea2e2dd..f9f0319 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/SortClusterConfgiEntity.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/SortClusterConfigEntity.java
@@ -24,7 +24,7 @@ import java.io.Serializable;
 
 @Data
 @Builder
-public class SortClusterConfgiEntity implements Serializable {
+public class SortClusterConfigEntity implements Serializable {
     private Integer id;
     private String clusterName;
     private String taskName;
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/SortClusterConfgiEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/SortClusterConfgiEntityMapper.java
index 567514e..0fba607 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/SortClusterConfgiEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/SortClusterConfgiEntityMapper.java
@@ -17,7 +17,7 @@
 
 package org.apache.inlong.manager.dao.mapper;
 
-import org.apache.inlong.manager.dao.entity.SortClusterConfgiEntity;
+import org.apache.inlong.manager.dao.entity.SortClusterConfigEntity;
 import org.springframework.stereotype.Repository;
 
 import java.util.List;
@@ -26,16 +26,16 @@ import java.util.List;
 public interface SortClusterConfgiEntityMapper {
     int deleteByPrimaryKey(Integer id);
 
-    int insert(SortClusterConfgiEntity record);
+    int insert(SortClusterConfigEntity record);
 
-    int insertSelective(SortClusterConfgiEntity record);
+    int insertSelective(SortClusterConfigEntity record);
 
-    SortClusterConfgiEntity selectByPrimaryKey(Integer id);
+    SortClusterConfigEntity selectByPrimaryKey(Integer id);
 
-    int updateByPrimaryKeySelective(SortClusterConfgiEntity record);
+    int updateByPrimaryKeySelective(SortClusterConfigEntity record);
 
-    int updateByPrimaryKey(SortClusterConfgiEntity record);
+    int updateByPrimaryKey(SortClusterConfigEntity record);
 
-    List<SortClusterConfgiEntity> selectTasksByClusterName(String clusterName);
+    List<SortClusterConfigEntity> selectTasksByClusterName(String clusterName);
 
 }
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/SortClusterConfgiEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/SortClusterConfgiEntityMapper.xml
index 6a2020e..4064a51 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/SortClusterConfgiEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/SortClusterConfgiEntityMapper.xml
@@ -19,7 +19,7 @@
 -->
 <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
 <mapper namespace="org.apache.inlong.manager.dao.mapper.SortClusterConfgiEntityMapper">
-  <resultMap id="BaseResultMap" type="org.apache.inlong.manager.dao.entity.SortClusterConfgiEntity">
+  <resultMap id="BaseResultMap" type="org.apache.inlong.manager.dao.entity.SortClusterConfigEntity">
     <id column="id" jdbcType="INTEGER" property="id" />
     <result column="cluster_name" jdbcType="VARCHAR" property="clusterName" />
     <result column="task_name" jdbcType="VARCHAR" property="taskName" />
@@ -38,13 +38,13 @@
     delete from sort_cluster_config
     where id = #{id,jdbcType=INTEGER}
   </delete>
-  <insert id="insert" parameterType="org.apache.inlong.manager.dao.entity.SortClusterConfgiEntity">
+  <insert id="insert" parameterType="org.apache.inlong.manager.dao.entity.SortClusterConfigEntity">
     insert into sort_cluster_config (id, cluster_name, task_name, 
       sink_type)
     values (#{id,jdbcType=INTEGER}, #{clusterName,jdbcType=VARCHAR}, #{taskName,jdbcType=VARCHAR}, 
       #{sinkType,jdbcType=VARCHAR})
   </insert>
-  <insert id="insertSelective" parameterType="org.apache.inlong.manager.dao.entity.SortClusterConfgiEntity">
+  <insert id="insertSelective" parameterType="org.apache.inlong.manager.dao.entity.SortClusterConfigEntity">
     insert into sort_cluster_config
     <trim prefix="(" suffix=")" suffixOverrides=",">
       <if test="id != null">
@@ -75,7 +75,7 @@
       </if>
     </trim>
   </insert>
-  <update id="updateByPrimaryKeySelective" parameterType="org.apache.inlong.manager.dao.entity.SortClusterConfgiEntity">
+  <update id="updateByPrimaryKeySelective" parameterType="org.apache.inlong.manager.dao.entity.SortClusterConfigEntity">
     update sort_cluster_config
     <set>
       <if test="clusterName != null">
@@ -90,7 +90,7 @@
     </set>
     where id = #{id,jdbcType=INTEGER}
   </update>
-  <update id="updateByPrimaryKey" parameterType="org.apache.inlong.manager.dao.entity.SortClusterConfgiEntity">
+  <update id="updateByPrimaryKey" parameterType="org.apache.inlong.manager.dao.entity.SortClusterConfigEntity">
     update sort_cluster_config
     set cluster_name = #{clusterName,jdbcType=VARCHAR},
       task_name = #{taskName,jdbcType=VARCHAR},
diff --git a/inlong-manager/manager-plugin-examples/pom.xml b/inlong-manager/manager-plugin-examples/pom.xml
new file mode 100644
index 0000000..a4e1198
--- /dev/null
+++ b/inlong-manager/manager-plugin-examples/pom.xml
@@ -0,0 +1,56 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements. See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License. You may obtain a copy of the License at
+  ~
+  ~ http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>inlong-manager</artifactId>
+        <groupId>org.apache.inlong</groupId>
+        <version>0.13.0-incubating-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>inlong-manager-plugin</artifactId>
+
+    <properties>
+        <maven.compiler.source>8</maven.compiler.source>
+        <maven.compiler.target>8</maven.compiler.target>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>manager-common</artifactId>
+            <version>${project.version}</version>
+            <scope>compile</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <resources>
+            <resource>
+                <directory>src/main/resources</directory>
+                <includes>
+                    <include>META-INF/*</include>
+                </includes>
+            </resource>
+        </resources>
+        <finalName>${project.name}</finalName>
+    </build>
+</project>
\ No newline at end of file
diff --git a/inlong-manager/manager-plugin-examples/src/main/java/org/apache/inlong/manager/plugin/EmptyProcessPlugin.java b/inlong-manager/manager-plugin-examples/src/main/java/org/apache/inlong/manager/plugin/EmptyProcessPlugin.java
new file mode 100644
index 0000000..64d7cb1
--- /dev/null
+++ b/inlong-manager/manager-plugin-examples/src/main/java/org/apache/inlong/manager/plugin/EmptyProcessPlugin.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.plugin;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import org.apache.inlong.manager.common.event.EventSelector;
+import org.apache.inlong.manager.common.event.task.DataSourceOperateListener;
+import org.apache.inlong.manager.common.event.task.QueueOperateListener;
+import org.apache.inlong.manager.common.event.task.SortOperateListener;
+import org.apache.inlong.manager.common.event.task.StorageOperateListener;
+import org.apache.inlong.manager.common.plugin.ProcessPlugin;
+
+public class EmptyProcessPlugin implements ProcessPlugin {
+
+    @Override
+    public Map<DataSourceOperateListener, EventSelector> createSourceOperateListeners() {
+        return new LinkedHashMap<>();
+    }
+
+    @Override
+    public Map<QueueOperateListener, EventSelector> createQueueOperateListeners() {
+        return new LinkedHashMap<>();
+    }
+
+    @Override
+    public Map<SortOperateListener, EventSelector> createSortOperateListeners() {
+        return ProcessPlugin.super.createSortOperateListeners();
+    }
+
+    @Override
+    public Map<StorageOperateListener, EventSelector> createStorageOperateListeners() {
+        return ProcessPlugin.super.createStorageOperateListeners();
+    }
+}
diff --git a/inlong-manager/manager-plugin-examples/src/main/resources/META-INF/plugin.yaml b/inlong-manager/manager-plugin-examples/src/main/resources/META-INF/plugin.yaml
new file mode 100644
index 0000000..689a149
--- /dev/null
+++ b/inlong-manager/manager-plugin-examples/src/main/resources/META-INF/plugin.yaml
@@ -0,0 +1,23 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+name: example
+description: example for manager plugin
+javaVersion: 1.8
+pluginClass: org.apache.inlong.manager.plugin.EmptyProcessPlugin
diff --git a/inlong-manager/manager-service/pom.xml b/inlong-manager/manager-service/pom.xml
index 9eecbc4..fd33e31 100644
--- a/inlong-manager/manager-service/pom.xml
+++ b/inlong-manager/manager-service/pom.xml
@@ -128,6 +128,10 @@
                 </exclusion>
             </exclusions>
         </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.dataformat</groupId>
+            <artifactId>jackson-dataformat-yaml</artifactId>
+        </dependency>
     </dependencies>
 
 </project>
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortClusterConfigService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortClusterConfigService.java
index 75d75ac..1e98709 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortClusterConfigService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortClusterConfigService.java
@@ -17,7 +17,7 @@
 
 package org.apache.inlong.manager.service.core;
 
-import org.apache.inlong.manager.dao.entity.SortClusterConfgiEntity;
+import org.apache.inlong.manager.dao.entity.SortClusterConfigEntity;
 
 import java.util.List;
 
@@ -31,5 +31,5 @@ public interface SortClusterConfigService {
      * @param clusterName Name of sort cluster.
      * @return List of tasks, including task name and sink type.
      */
-    List<SortClusterConfgiEntity> selectTasksByClusterName(String clusterName);
+    List<SortClusterConfigEntity> selectTasksByClusterName(String clusterName);
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortClusterConfigServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortClusterConfigServiceImpl.java
index 2f78f64..51f9dc3 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortClusterConfigServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortClusterConfigServiceImpl.java
@@ -17,7 +17,7 @@
 
 package org.apache.inlong.manager.service.core.impl;
 
-import org.apache.inlong.manager.dao.entity.SortClusterConfgiEntity;
+import org.apache.inlong.manager.dao.entity.SortClusterConfigEntity;
 import org.apache.inlong.manager.dao.mapper.SortClusterConfgiEntityMapper;
 import org.apache.inlong.manager.service.core.SortClusterConfigService;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -35,7 +35,7 @@ public class SortClusterConfigServiceImpl implements SortClusterConfigService {
     private SortClusterConfgiEntityMapper sortClusterConfgiEntityMapper;
 
     @Override
-    public List<SortClusterConfgiEntity> selectTasksByClusterName(String clusterName) {
+    public List<SortClusterConfigEntity> selectTasksByClusterName(String clusterName) {
         return sortClusterConfgiEntityMapper.selectTasksByClusterName(clusterName);
     }
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java
index 98f905f..3bea2dc 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java
@@ -19,9 +19,9 @@ package org.apache.inlong.manager.service.core.impl;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.pojo.sort.SortClusterConfigResponse;
+import org.apache.inlong.manager.dao.entity.SortClusterConfigEntity;
 import org.apache.inlong.manager.common.pojo.sort.SortClusterConfigResponse.SinkType;
 import org.apache.inlong.manager.common.pojo.sort.SortClusterConfigResponse.SortTaskConfig;
-import org.apache.inlong.manager.dao.entity.SortClusterConfgiEntity;
 import org.apache.inlong.manager.service.core.SortClusterConfigService;
 import org.apache.inlong.manager.service.core.SortTaskIdParamService;
 import org.apache.inlong.manager.service.core.SortService;
@@ -56,8 +56,8 @@ public class SortServiceImpl implements SortService {
         }
 
         // check if there is any task.
-        List<SortClusterConfgiEntity> tasks =
-                sortClusterConfigService.selectTasksByClusterName(clusterName);
+        List<SortClusterConfigEntity> tasks = sortClusterConfigService.selectTasksByClusterName(clusterName);
+
         if (tasks == null || tasks.isEmpty()) {
             String errMsg = "There is not any task for cluster" + clusterName;
             LOGGER.info(errMsg);
@@ -77,7 +77,7 @@ public class SortServiceImpl implements SortService {
         return SortClusterConfigResponse.builder().tasks(taskConfigs).msg("success").build();
     }
 
-    private SortTaskConfig getTaskConfig(SortClusterConfgiEntity clusterConfig) {
+    private SortTaskConfig getTaskConfig(SortClusterConfigEntity clusterConfig) {
         List<Map<String, String>> idParams =
                 sortTaskIdParamService.selectByTaskName(clusterConfig.getTaskName());
         // TODO add method that get sink params
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/plugin/JarHell.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/plugin/JarHell.java
new file mode 100644
index 0000000..5dd5520
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/plugin/JarHell.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.core.plugin;
+
+import com.google.common.collect.Lists;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import org.apache.commons.lang3.StringUtils;
+
+public class JarHell {
+
+    public static final JavaVersion CURRENT_VERSION = new JavaVersion(Lists.newArrayList(1, 8), null);
+
+    public static class JavaVersion {
+
+        private final List<Integer> version;
+        private final String prePart;
+
+        public List<Integer> getVersion() {
+            return version;
+        }
+
+        private JavaVersion(List<Integer> version, String prePart) {
+            this.prePart = prePart;
+            if (version.size() >= 2 && version.get(0) == 1 && version.get(1) == 8) {
+                // for Java 8 there is ambiguity since both 1.8 and 8 are supported,
+                version = new ArrayList<>(version.subList(1, version.size()));
+            }
+            this.version = Collections.unmodifiableList(version);
+        }
+
+        @Override
+        public String toString() {
+            return StringUtils.join(version, '.') + "-" + prePart;
+        }
+
+        public int compareTo(JavaVersion o) {
+            int len = Math.max(version.size(), o.version.size());
+            for (int i = 0; i < len; i++) {
+                int d = (i < version.size() ? version.get(i) : 0);
+                int s = (i < o.version.size() ? o.version.get(i) : 0);
+                if (s < d) {
+                    return 1;
+                }
+                if (s > d) {
+                    return -1;
+                }
+            }
+            if (prePart != null && o.prePart == null) {
+                return -1;
+            } else if (prePart == null && o.prePart != null) {
+                return 1;
+            } else if (prePart != null && o.prePart != null) {
+                return comparePrePart(prePart, o.prePart);
+            }
+            return 0;
+        }
+
+        private int comparePrePart(String prePart, String otherPrePart) {
+            if (prePart.matches("\\d+")) {
+                return otherPrePart.matches("\\d+")
+                        ? (new BigInteger(prePart)).compareTo(new BigInteger(otherPrePart)) : -1;
+            } else {
+                return otherPrePart.matches("\\d+")
+                        ? 1 : prePart.compareTo(otherPrePart);
+            }
+        }
+    }
+
+    private JarHell() {
+    }
+
+    public static void checkJavaVersion(String resource, String javaVersion) {
+        JavaVersion version = parse(javaVersion);
+        if (CURRENT_VERSION.compareTo(version) < 0) {
+            throw new IllegalArgumentException(
+                    String.format("%s requires Java %s:, your system: %s", resource, CURRENT_VERSION,
+                            javaVersion));
+        }
+    }
+
+    public static JavaVersion parse(String value) {
+        Objects.requireNonNull(value);
+        String prePart = null;
+        if (!isValid(value)) {
+            throw new IllegalArgumentException("Java version string [" + value + "] could not be parsed.");
+        }
+        List<Integer> version = new ArrayList<>();
+        String[] parts = value.split("-");
+        String[] numericComponents;
+        if (parts.length == 1) {
+            numericComponents = value.split("\\.");
+        } else if (parts.length == 2) {
+            numericComponents = parts[0].split("\\.");
+            prePart = parts[1];
+        } else {
+            throw new IllegalArgumentException("Java version string [" + value + "] could not be parsed.");
+        }
+
+        for (String component : numericComponents) {
+            version.add(Integer.valueOf(component));
+        }
+        return new JavaVersion(version, prePart);
+    }
+
+    public static boolean isValid(String value) {
+        return value.matches("^0*[0-9]+(\\.[0-9]+)*(-[a-zA-Z0-9]+)?$");
+    }
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/plugin/PluginClassLoader.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/plugin/PluginClassLoader.java
new file mode 100644
index 0000000..2071f48
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/plugin/PluginClassLoader.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.core.plugin;
+
+import com.fasterxml.jackson.annotation.JsonInclude.Include;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.jar.JarEntry;
+import java.util.jar.JarFile;
+import java.util.stream.Collectors;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.plugin.PluginDefinition;
+
+@Slf4j
+public class PluginClassLoader extends URLClassLoader {
+
+    public static final String PLUGIN_PATH = "META-INF/plugin.yaml";
+
+    /**
+     * plugin.yaml should less than 1k
+     */
+    public static final int PLUGIN_DEF_CAPACITY = 1024;
+
+    /**
+     * pluginName -> pluginDefinition
+     */
+    private Map<String, PluginDefinition> pluginDefinitionMap = new HashMap<>();
+
+    private File pluginDirectory;
+
+    private ObjectMapper yamlMapper;
+
+    private PluginClassLoader(URL url, ClassLoader parent) throws IOException {
+        super(new URL[]{url}, parent);
+        this.pluginDirectory = new File(url.getPath());
+        initYamlMapper();
+        loadPluginDefinition();
+    }
+
+    public static PluginClassLoader getFromPluginUrl(String url, ClassLoader parent) {
+        checkClassLoader(parent);
+        checkUrl(url);
+        return AccessController.doPrivileged(new PrivilegedAction<PluginClassLoader>() {
+            @SneakyThrows
+            @Override
+            public PluginClassLoader run() {
+                return new PluginClassLoader(new URL("file://" + url), parent);
+            }
+        });
+    }
+
+    public Map<String, PluginDefinition> getPluginDefinitions() {
+        return this.pluginDefinitionMap;
+    }
+
+    private void initYamlMapper() {
+        ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
+        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+        mapper.configure(DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL, true);
+        mapper.setSerializationInclusion(Include.NON_NULL);
+        this.yamlMapper = mapper;
+    }
+
+    /**
+     * load pluginDefinition in **.jar/META-INF/plugin.yaml
+     */
+    private void loadPluginDefinition() throws IOException {
+        List<PluginDefinition> definitions = new ArrayList();
+        for (File jarFile : pluginDirectory.listFiles()) {
+            if (!jarFile.getName().endsWith(".jar")) {
+                log.warn("{}' is not plugin jar , please check", jarFile);
+                continue;
+            }
+            JarFile pluginJar = new JarFile(jarFile);
+            String pluginDef = readPluginDef(pluginJar);
+            pluginDef = pluginDef.replaceAll("[\\x00]+","");
+            PluginDefinition definition = yamlMapper.readValue(pluginDef, PluginDefinition.class);
+            addURL(new URL("file://" + jarFile.getAbsolutePath()));
+            checkPluginValid(jarFile, definition);
+            definitions.add(definition);
+        }
+        pluginDefinitionMap = definitions.stream()
+                .collect(Collectors.toMap(definition -> definition.getName(), definition -> definition));
+    }
+
+    private void checkPluginValid(File jarFile, PluginDefinition pluginDefinition) {
+        if (StringUtils.isEmpty(pluginDefinition.getName())) {
+            throw new RuntimeException(String.format("%s should define pluginName in plugin.yaml", jarFile.getName()));
+        }
+        if (StringUtils.isEmpty(pluginDefinition.getPluginClass())) {
+            throw new RuntimeException(String.format("%s should define pluginClass in plugin.yaml", jarFile.getName()));
+        }
+        try {
+            this.loadClass(pluginDefinition.getPluginClass());
+        } catch (ClassNotFoundException e) {
+            throw new RuntimeException(
+                    String.format("pluginClass '%s' could not be found in %s", pluginDefinition.getPluginClass(),
+                            jarFile.getName()));
+        }
+        if (StringUtils.isEmpty(pluginDefinition.getDescription())) {
+            log.warn(String.format("%s should define description in plugin.yaml", jarFile.getName()));
+        }
+        if (StringUtils.isEmpty(pluginDefinition.getJavaVersion())) {
+            throw new RuntimeException(String.format("%s should define javaVersion in plugin.yaml", jarFile.getName()));
+        }
+        JarHell.checkJavaVersion(pluginDefinition.getName(), pluginDefinition.getJavaVersion());
+    }
+
+    private String readPluginDef(JarFile jar) throws IOException {
+        JarEntry entry = jar.getJarEntry(PLUGIN_PATH);
+        if (entry == null) {
+            throw new RuntimeException(String.format("%s is not found in jar '%s'", PLUGIN_PATH, jar.getName()));
+        }
+        ByteBuffer buffer = ByteBuffer.allocate(PLUGIN_DEF_CAPACITY);
+        int bt;
+        try (InputStream is = jar.getInputStream(entry)) {
+            while ((bt = is.read()) != -1) {
+                buffer.put((byte) bt);
+            }
+        }
+        return new String(buffer.array(), StandardCharsets.UTF_8);
+    }
+
+    private static void checkClassLoader(ClassLoader classLoader) {
+        if (classLoader == null) {
+            throw new RuntimeException("parent classLoader should not be null");
+        }
+    }
+
+    private static void checkUrl(String url) {
+        if (StringUtils.isBlank(url)) {
+            throw new IllegalArgumentException("url should not be empty");
+        }
+        File pluginDirectory = new File(url);
+        if (!pluginDirectory.exists()) {
+            throw new RuntimeException(String.format("pluginDirectory '%s' is not exists", pluginDirectory));
+        }
+        if (!pluginDirectory.isDirectory()) {
+            throw new RuntimeException(String.format("pluginDirectory '%s' should be directory", pluginDirectory));
+        }
+        if (!pluginDirectory.canRead()) {
+            throw new RuntimeException(String.format("pluginDirectory '%s' is not readable", pluginDirectory));
+        }
+    }
+
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/plugin/PluginService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/plugin/PluginService.java
new file mode 100644
index 0000000..7c364e2
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/plugin/PluginService.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.core.plugin;
+
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.inlong.manager.common.plugin.Plugin;
+import org.apache.inlong.manager.common.plugin.PluginBinder;
+import org.apache.inlong.manager.common.plugin.PluginDefinition;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Service;
+
+@Service
+@Slf4j
+public class PluginService {
+
+    public static final String DEFAULT_PLUGIN_LOC = "/plugins";
+
+    @Setter
+    @Getter
+    @Value("${plugin.location?:''}")
+    private String pluginLoc;
+
+    @Getter
+    @Autowired
+    private List<PluginBinder> pluginBinders;
+
+    @Getter
+    private List<Plugin> plugins = new ArrayList<>();
+
+    public PluginService() {
+        if (StringUtils.isBlank(pluginLoc)) {
+            pluginLoc = DEFAULT_PLUGIN_LOC;
+        }
+        pluginReload();
+    }
+
+    public void pluginReload() {
+        Path path = Paths.get(pluginLoc).toAbsolutePath();
+        log.info("search for plugin in {}", pluginLoc);
+        if (!path.toFile().exists()) {
+            log.warn("plugin directory not found");
+            return;
+        }
+        PluginClassLoader pluginLoader = PluginClassLoader.getFromPluginUrl(path.toString(),
+                Thread.currentThread().getContextClassLoader());
+        Map<String, PluginDefinition> pluginDefinitions = pluginLoader.getPluginDefinitions();
+        if (MapUtils.isEmpty(pluginDefinitions)) {
+            log.warn("pluginDefinition not found in {}", pluginLoc);
+            return;
+        }
+        List<Plugin> plugins = new ArrayList<>();
+        for (PluginDefinition pluginDefinition : pluginDefinitions.values()) {
+            String pluginClassName = pluginDefinition.getPluginClass();
+            try {
+                Class pluginClass = pluginLoader.loadClass(pluginClassName);
+                Object plugin = pluginClass.getDeclaredConstructor().newInstance();
+                plugins.add((Plugin) plugin);
+            } catch (Throwable e) {
+                throw new RuntimeException(e.getMessage());
+            }
+        }
+        this.plugins.addAll(plugins);
+        for (PluginBinder pluginBinder : pluginBinders) {
+            for (Plugin plugin : plugins) {
+                pluginBinder.acceptPlugin(plugin);
+            }
+        }
+    }
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/CreateHiveTableEventSelector.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/CreateHiveTableEventSelector.java
new file mode 100644
index 0000000..35d4cc2
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/CreateHiveTableEventSelector.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.thirdpart.hive;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.enums.BizConstant;
+import org.apache.inlong.manager.common.event.EventSelector;
+import org.apache.inlong.manager.common.model.WorkflowContext;
+import org.apache.inlong.manager.common.model.definition.ProcessForm;
+import org.apache.inlong.manager.dao.entity.DataStreamEntity;
+import org.apache.inlong.manager.dao.mapper.DataStreamEntityMapper;
+import org.apache.inlong.manager.service.core.StorageService;
+import org.apache.inlong.manager.service.workflow.business.BusinessResourceWorkflowForm;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Component
+@Slf4j
+public class CreateHiveTableEventSelector implements EventSelector {
+
+    @Autowired
+    private StorageService storageService;
+    @Autowired
+    private DataStreamEntityMapper streamMapper;
+
+    @Override
+    public boolean accept(WorkflowContext context) {
+        ProcessForm processForm = context.getProcessForm();
+        if (processForm == null || !(processForm instanceof BusinessResourceWorkflowForm)) {
+            return false;
+        }
+        BusinessResourceWorkflowForm form = (BusinessResourceWorkflowForm) processForm;
+        if (form.getBusinessInfo() == null || StringUtils.isEmpty(form.getBusinessInfo().getInlongGroupId())) {
+            return false;
+        }
+        String groupId = form.getInlongGroupId();
+        List<String> dsForHive = storageService.filterStreamIdByStorageType(groupId, BizConstant.STORAGE_HIVE,
+                streamMapper.selectByGroupId(groupId)
+                        .stream()
+                        .map(DataStreamEntity::getInlongStreamId)
+                        .collect(Collectors.toList()));
+        //todo check if create hive automatically
+        if (CollectionUtils.isEmpty(dsForHive)) {
+            log.warn("groupId={} streamId={} does not have storage, skip to create hive table ",
+                    groupId, form.getInlongStreamId());
+            return true;
+        }
+        return false;
+    }
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/CreateHiveTableForStreamListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/CreateHiveTableForStreamListener.java
index 1975c20..420afa2 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/CreateHiveTableForStreamListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/CreateHiveTableForStreamListener.java
@@ -20,8 +20,8 @@ package org.apache.inlong.manager.service.thirdpart.hive;
 import java.util.List;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.common.event.ListenerResult;
+import org.apache.inlong.manager.common.event.task.StorageOperateListener;
 import org.apache.inlong.manager.common.event.task.TaskEvent;
-import org.apache.inlong.manager.common.event.task.TaskEventListener;
 import org.apache.inlong.manager.common.model.WorkflowContext;
 import org.apache.inlong.manager.common.pojo.datastorage.StorageHiveDTO;
 import org.apache.inlong.manager.dao.mapper.StorageHiveEntityMapper;
@@ -34,7 +34,7 @@ import org.springframework.stereotype.Service;
  */
 @Service
 @Slf4j
-public class CreateHiveTableForStreamListener implements TaskEventListener {
+public class CreateHiveTableForStreamListener implements StorageOperateListener {
 
     @Autowired
     private StorageHiveEntityMapper hiveEntityMapper;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/CreateHiveTableListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/CreateHiveTableListener.java
index 45ac9e0..98c41c1 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/CreateHiveTableListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/CreateHiveTableListener.java
@@ -20,8 +20,8 @@ package org.apache.inlong.manager.service.thirdpart.hive;
 import java.util.List;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.common.event.ListenerResult;
+import org.apache.inlong.manager.common.event.task.StorageOperateListener;
 import org.apache.inlong.manager.common.event.task.TaskEvent;
-import org.apache.inlong.manager.common.event.task.TaskEventListener;
 import org.apache.inlong.manager.common.model.WorkflowContext;
 import org.apache.inlong.manager.common.pojo.datastorage.StorageHiveDTO;
 import org.apache.inlong.manager.dao.mapper.StorageHiveEntityMapper;
@@ -34,7 +34,7 @@ import org.springframework.stereotype.Service;
  */
 @Service
 @Slf4j
-public class CreateHiveTableListener implements TaskEventListener {
+public class CreateHiveTableListener implements StorageOperateListener {
 
     @Autowired
     private StorageHiveEntityMapper hiveEntityMapper;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreatePulsarGroupForStreamTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreatePulsarGroupForStreamTaskListener.java
index 54923dd..c92e0a7 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreatePulsarGroupForStreamTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreatePulsarGroupForStreamTaskListener.java
@@ -20,7 +20,12 @@ package org.apache.inlong.manager.service.thirdpart.mq;
 import java.util.List;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.common.beans.ClusterBean;
+import org.apache.inlong.manager.common.event.ListenerResult;
+import org.apache.inlong.manager.common.event.task.QueueOperateListener;
+import org.apache.inlong.manager.common.event.task.TaskEvent;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
+import org.apache.inlong.manager.common.model.WorkflowContext;
 import org.apache.inlong.manager.common.pojo.business.BusinessInfo;
 import org.apache.inlong.manager.common.pojo.pulsar.PulsarTopicBean;
 import org.apache.inlong.manager.dao.entity.DataStreamEntity;
@@ -30,11 +35,6 @@ import org.apache.inlong.manager.service.core.ConsumptionService;
 import org.apache.inlong.manager.service.core.StorageService;
 import org.apache.inlong.manager.service.thirdpart.mq.util.PulsarUtils;
 import org.apache.inlong.manager.service.workflow.business.BusinessResourceWorkflowForm;
-import org.apache.inlong.manager.common.event.ListenerResult;
-import org.apache.inlong.manager.common.event.task.TaskEvent;
-import org.apache.inlong.manager.common.event.task.TaskEventListener;
-import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
-import org.apache.inlong.manager.common.model.WorkflowContext;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
@@ -44,7 +44,7 @@ import org.springframework.stereotype.Component;
  */
 @Slf4j
 @Component
-public class CreatePulsarGroupForStreamTaskListener implements TaskEventListener {
+public class CreatePulsarGroupForStreamTaskListener implements QueueOperateListener {
 
     @Autowired
     private ClusterBean clusterBean;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreatePulsarGroupTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreatePulsarGroupTaskListener.java
index 16f230f..ddea360 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreatePulsarGroupTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreatePulsarGroupTaskListener.java
@@ -20,6 +20,11 @@ package org.apache.inlong.manager.service.thirdpart.mq;
 import java.util.List;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.common.beans.ClusterBean;
+import org.apache.inlong.manager.common.event.ListenerResult;
+import org.apache.inlong.manager.common.event.task.QueueOperateListener;
+import org.apache.inlong.manager.common.event.task.TaskEvent;
+import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
+import org.apache.inlong.manager.common.model.WorkflowContext;
 import org.apache.inlong.manager.common.pojo.business.BusinessInfo;
 import org.apache.inlong.manager.common.pojo.pulsar.PulsarTopicBean;
 import org.apache.inlong.manager.dao.entity.DataStreamEntity;
@@ -28,11 +33,6 @@ import org.apache.inlong.manager.service.core.BusinessService;
 import org.apache.inlong.manager.service.core.ConsumptionService;
 import org.apache.inlong.manager.service.thirdpart.mq.util.PulsarUtils;
 import org.apache.inlong.manager.service.workflow.business.BusinessResourceWorkflowForm;
-import org.apache.inlong.manager.common.event.ListenerResult;
-import org.apache.inlong.manager.common.event.task.TaskEvent;
-import org.apache.inlong.manager.common.event.task.TaskEventListener;
-import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
-import org.apache.inlong.manager.common.model.WorkflowContext;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
@@ -42,7 +42,7 @@ import org.springframework.stereotype.Component;
  */
 @Component
 @Slf4j
-public class CreatePulsarGroupTaskListener implements TaskEventListener {
+public class CreatePulsarGroupTaskListener implements QueueOperateListener {
 
     @Autowired
     private ClusterBean clusterBean;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreatePulsarResourceTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreatePulsarResourceTaskListener.java
index 65844b8..7673810 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreatePulsarResourceTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreatePulsarResourceTaskListener.java
@@ -20,6 +20,11 @@ package org.apache.inlong.manager.service.thirdpart.mq;
 import java.util.List;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.common.beans.ClusterBean;
+import org.apache.inlong.manager.common.event.ListenerResult;
+import org.apache.inlong.manager.common.event.task.QueueOperateListener;
+import org.apache.inlong.manager.common.event.task.TaskEvent;
+import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
+import org.apache.inlong.manager.common.model.WorkflowContext;
 import org.apache.inlong.manager.common.pojo.business.BusinessInfo;
 import org.apache.inlong.manager.common.pojo.datastream.DataStreamTopicVO;
 import org.apache.inlong.manager.common.pojo.pulsar.PulsarTopicBean;
@@ -30,11 +35,6 @@ import org.apache.inlong.manager.dao.mapper.DataStreamEntityMapper;
 import org.apache.inlong.manager.service.core.BusinessService;
 import org.apache.inlong.manager.service.thirdpart.mq.util.PulsarUtils;
 import org.apache.inlong.manager.service.workflow.business.BusinessResourceWorkflowForm;
-import org.apache.inlong.manager.common.event.ListenerResult;
-import org.apache.inlong.manager.common.event.task.TaskEvent;
-import org.apache.inlong.manager.common.event.task.TaskEventListener;
-import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
-import org.apache.inlong.manager.common.model.WorkflowContext;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
@@ -44,7 +44,7 @@ import org.springframework.stereotype.Component;
  */
 @Slf4j
 @Component()
-public class CreatePulsarResourceTaskListener implements TaskEventListener {
+public class CreatePulsarResourceTaskListener implements QueueOperateListener {
 
     @Autowired
     PulsarOptService pulsarOptService;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreatePulsarTopicForStreamTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreatePulsarTopicForStreamTaskListener.java
index 0a1e72e..8eaf2e3 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreatePulsarTopicForStreamTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreatePulsarTopicForStreamTaskListener.java
@@ -20,6 +20,11 @@ package org.apache.inlong.manager.service.thirdpart.mq;
 import java.util.List;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.common.beans.ClusterBean;
+import org.apache.inlong.manager.common.event.ListenerResult;
+import org.apache.inlong.manager.common.event.task.QueueOperateListener;
+import org.apache.inlong.manager.common.event.task.TaskEvent;
+import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
+import org.apache.inlong.manager.common.model.WorkflowContext;
 import org.apache.inlong.manager.common.pojo.business.BusinessInfo;
 import org.apache.inlong.manager.common.pojo.pulsar.PulsarTopicBean;
 import org.apache.inlong.manager.dao.entity.DataStreamEntity;
@@ -27,11 +32,6 @@ import org.apache.inlong.manager.dao.mapper.DataStreamEntityMapper;
 import org.apache.inlong.manager.service.core.BusinessService;
 import org.apache.inlong.manager.service.thirdpart.mq.util.PulsarUtils;
 import org.apache.inlong.manager.service.workflow.business.BusinessResourceWorkflowForm;
-import org.apache.inlong.manager.common.event.ListenerResult;
-import org.apache.inlong.manager.common.event.task.TaskEvent;
-import org.apache.inlong.manager.common.event.task.TaskEventListener;
-import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
-import org.apache.inlong.manager.common.model.WorkflowContext;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
@@ -41,7 +41,7 @@ import org.springframework.stereotype.Component;
  */
 @Slf4j
 @Component
-public class CreatePulsarTopicForStreamTaskListener implements TaskEventListener {
+public class CreatePulsarTopicForStreamTaskListener implements QueueOperateListener {
 
     @Autowired
     private ClusterBean clusterBean;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreateTubeGroupTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreateTubeGroupTaskListener.java
index 1911e38..19ba81b 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreateTubeGroupTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreateTubeGroupTaskListener.java
@@ -20,6 +20,11 @@ package org.apache.inlong.manager.service.thirdpart.mq;
 import java.util.Collections;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.common.beans.ReTryConfigBean;
+import org.apache.inlong.manager.common.event.ListenerResult;
+import org.apache.inlong.manager.common.event.task.QueueOperateListener;
+import org.apache.inlong.manager.common.event.task.TaskEvent;
+import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
+import org.apache.inlong.manager.common.model.WorkflowContext;
 import org.apache.inlong.manager.common.pojo.business.BusinessInfo;
 import org.apache.inlong.manager.common.pojo.tubemq.AddTubeConsumeGroupRequest;
 import org.apache.inlong.manager.common.pojo.tubemq.AddTubeConsumeGroupRequest.GroupNameJsonSetBean;
@@ -27,18 +32,13 @@ import org.apache.inlong.manager.common.pojo.tubemq.QueryTubeTopicRequest;
 import org.apache.inlong.manager.dao.mapper.ClusterInfoMapper;
 import org.apache.inlong.manager.service.core.BusinessService;
 import org.apache.inlong.manager.service.workflow.business.BusinessResourceWorkflowForm;
-import org.apache.inlong.manager.common.event.ListenerResult;
-import org.apache.inlong.manager.common.event.task.TaskEvent;
-import org.apache.inlong.manager.common.event.task.TaskEventListener;
-import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
-import org.apache.inlong.manager.common.model.WorkflowContext;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Component;
 
 @Component
 @Slf4j
-public class CreateTubeGroupTaskListener implements TaskEventListener {
+public class CreateTubeGroupTaskListener implements QueueOperateListener {
 
     @Autowired
     BusinessService businessService;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreateTubeTopicTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreateTubeTopicTaskListener.java
index 02165a6..1933351 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreateTubeTopicTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreateTubeTopicTaskListener.java
@@ -19,15 +19,15 @@ package org.apache.inlong.manager.service.thirdpart.mq;
 
 import java.util.Collections;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.pojo.business.BusinessInfo;
-import org.apache.inlong.manager.common.pojo.tubemq.AddTubeMqTopicRequest;
-import org.apache.inlong.manager.service.core.BusinessService;
-import org.apache.inlong.manager.service.workflow.business.BusinessResourceWorkflowForm;
 import org.apache.inlong.manager.common.event.ListenerResult;
+import org.apache.inlong.manager.common.event.task.QueueOperateListener;
 import org.apache.inlong.manager.common.event.task.TaskEvent;
-import org.apache.inlong.manager.common.event.task.TaskEventListener;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
 import org.apache.inlong.manager.common.model.WorkflowContext;
+import org.apache.inlong.manager.common.pojo.business.BusinessInfo;
+import org.apache.inlong.manager.common.pojo.tubemq.AddTubeMqTopicRequest;
+import org.apache.inlong.manager.service.core.BusinessService;
+import org.apache.inlong.manager.service.workflow.business.BusinessResourceWorkflowForm;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
@@ -36,7 +36,7 @@ import org.springframework.stereotype.Component;
  */
 @Component
 @Slf4j
-public class CreateTubeTopicTaskListener implements TaskEventListener {
+public class CreateTubeTopicTaskListener implements QueueOperateListener {
 
     @Autowired
     private TubeMqOptService tubeMqOptService;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/PulsarEventSelector.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/PulsarEventSelector.java
new file mode 100644
index 0000000..8e1be46
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/PulsarEventSelector.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.thirdpart.mq;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.inlong.manager.common.enums.BizConstant;
+import org.apache.inlong.manager.common.event.EventSelector;
+import org.apache.inlong.manager.common.model.WorkflowContext;
+import org.apache.inlong.manager.service.workflow.business.BusinessResourceWorkflowForm;
+
+@Slf4j
+public class PulsarEventSelector implements EventSelector {
+
+    @Override
+    public boolean accept(WorkflowContext context) {
+        BusinessResourceWorkflowForm form = (BusinessResourceWorkflowForm) context.getProcessForm();
+        String middlewareType = form.getBusinessInfo().getMiddlewareType();
+        if (BizConstant.MIDDLEWARE_PULSAR.equalsIgnoreCase(middlewareType)) {
+            return true;
+        }
+        log.warn("no need to create pulsar subscription group for groupId={}, as the middlewareType={}",
+                form.getInlongGroupId(), middlewareType);
+        return false;
+    }
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/TubeEventSelector.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/TubeEventSelector.java
new file mode 100644
index 0000000..415a03a
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/TubeEventSelector.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.thirdpart.mq;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.inlong.manager.common.enums.BizConstant;
+import org.apache.inlong.manager.common.event.EventSelector;
+import org.apache.inlong.manager.common.model.WorkflowContext;
+import org.apache.inlong.manager.common.pojo.business.BusinessInfo;
+import org.apache.inlong.manager.service.workflow.business.BusinessResourceWorkflowForm;
+
+@Slf4j
+public class TubeEventSelector implements EventSelector {
+
+    @Override
+    public boolean accept(WorkflowContext context) {
+        BusinessResourceWorkflowForm form = (BusinessResourceWorkflowForm) context.getProcessForm();
+        BusinessInfo businessInfo = form.getBusinessInfo();
+        if (BizConstant.MIDDLEWARE_TUBE.equalsIgnoreCase(businessInfo.getMiddlewareType())) {
+            return true;
+        }
+        log.warn("not need to create tube resource for groupId={}, as the middleware type is {}",
+                businessInfo.getMiddlewareType(), form.getInlongGroupId());
+        return false;
+    }
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/PushHiveConfigTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/PushHiveConfigTaskListener.java
index 5b6f1a9..bae7133 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/PushHiveConfigTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/PushHiveConfigTaskListener.java
@@ -28,6 +28,11 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.beans.ClusterBean;
 import org.apache.inlong.manager.common.enums.BizConstant;
 import org.apache.inlong.manager.common.enums.EntityStatus;
+import org.apache.inlong.manager.common.event.ListenerResult;
+import org.apache.inlong.manager.common.event.task.SortOperateListener;
+import org.apache.inlong.manager.common.event.task.TaskEvent;
+import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
+import org.apache.inlong.manager.common.model.WorkflowContext;
 import org.apache.inlong.manager.common.pojo.business.BusinessExtInfo;
 import org.apache.inlong.manager.common.pojo.business.BusinessInfo;
 import org.apache.inlong.manager.common.pojo.datastorage.StorageHiveDTO;
@@ -40,11 +45,6 @@ import org.apache.inlong.manager.dao.mapper.BusinessEntityMapper;
 import org.apache.inlong.manager.dao.mapper.StorageHiveEntityMapper;
 import org.apache.inlong.manager.dao.mapper.StorageHiveFieldEntityMapper;
 import org.apache.inlong.manager.service.workflow.business.BusinessResourceWorkflowForm;
-import org.apache.inlong.manager.common.event.ListenerResult;
-import org.apache.inlong.manager.common.event.task.TaskEvent;
-import org.apache.inlong.manager.common.event.task.TaskEventListener;
-import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
-import org.apache.inlong.manager.common.model.WorkflowContext;
 import org.apache.inlong.sort.ZkTools;
 import org.apache.inlong.sort.formats.common.FormatInfo;
 import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
@@ -64,7 +64,7 @@ import org.springframework.stereotype.Component;
 
 @Slf4j
 @Component
-public class PushHiveConfigTaskListener implements TaskEventListener {
+public class PushHiveConfigTaskListener implements SortOperateListener {
 
     private static final Map<String, String> PARTITION_TIME_FORMAT_MAP = new HashMap<>();
 
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/BaseConfig.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/ZkSortEventSelector.java
similarity index 62%
copy from inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/BaseConfig.java
copy to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/ZkSortEventSelector.java
index 3a480eb..4a7670f 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/BaseConfig.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/ZkSortEventSelector.java
@@ -15,17 +15,20 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.service.core;
+package org.apache.inlong.manager.service.thirdpart.sort;
 
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.web.client.RestTemplate;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.inlong.manager.common.event.EventSelector;
+import org.apache.inlong.manager.common.model.WorkflowContext;
+import org.springframework.stereotype.Component;
 
-@Configuration
-public class BaseConfig {
-    @Bean
-    public RestTemplate restTemplate() {
-        return new RestTemplate();
-    }
+@Component
+@Slf4j
+public class ZkSortEventSelector implements EventSelector {
 
+    @Override
+    public boolean accept(WorkflowContext context) {
+        //todo check if push sort config to zookeeper
+        return true;
+    }
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/TaskEventListenerFactory.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/TaskEventListenerFactory.java
new file mode 100644
index 0000000..810b7dd
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/TaskEventListenerFactory.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.workflow;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.PostConstruct;
+import org.apache.commons.collections.MapUtils;
+import org.apache.inlong.manager.common.event.EventSelector;
+import org.apache.inlong.manager.common.event.task.DataSourceOperateListener;
+import org.apache.inlong.manager.common.event.task.QueueOperateListener;
+import org.apache.inlong.manager.common.event.task.SortOperateListener;
+import org.apache.inlong.manager.common.event.task.StorageOperateListener;
+import org.apache.inlong.manager.common.model.WorkflowContext;
+import org.apache.inlong.manager.common.plugin.Plugin;
+import org.apache.inlong.manager.common.plugin.PluginBinder;
+import org.apache.inlong.manager.common.plugin.ProcessPlugin;
+import org.apache.inlong.manager.service.thirdpart.hive.CreateHiveTableListener;
+import org.apache.inlong.manager.service.thirdpart.hive.CreateHiveTableEventSelector;
+import org.apache.inlong.manager.service.thirdpart.mq.CreatePulsarGroupTaskListener;
+import org.apache.inlong.manager.service.thirdpart.mq.CreatePulsarResourceTaskListener;
+import org.apache.inlong.manager.service.thirdpart.mq.CreateTubeGroupTaskListener;
+import org.apache.inlong.manager.service.thirdpart.mq.CreateTubeTopicTaskListener;
+import org.apache.inlong.manager.service.thirdpart.mq.PulsarEventSelector;
+import org.apache.inlong.manager.service.thirdpart.mq.TubeEventSelector;
+import org.apache.inlong.manager.service.thirdpart.sort.PushHiveConfigTaskListener;
+import org.apache.inlong.manager.service.thirdpart.sort.ZkSortEventSelector;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Component
+public class TaskEventListenerFactory implements PluginBinder {
+
+    private Map<DataSourceOperateListener, EventSelector> sourceOperateListeners;
+
+    private Map<StorageOperateListener, EventSelector> storageOperateListeners;
+
+    private Map<QueueOperateListener, EventSelector> queueOperateListeners;
+
+    private Map<SortOperateListener, EventSelector> sortOperateListeners;
+
+    @Autowired
+    private CreateTubeTopicTaskListener createTubeTopicTaskListener;
+    @Autowired
+    private CreateTubeGroupTaskListener createTubeGroupTaskListener;
+    @Autowired
+    private CreatePulsarResourceTaskListener createPulsarResourceTaskListener;
+    @Autowired
+    private CreatePulsarGroupTaskListener createPulsarGroupTaskListener;
+
+    @Autowired
+    private CreateHiveTableListener createHiveTableListener;
+    @Autowired
+    private CreateHiveTableEventSelector createHiveTableEventSelector;
+
+    @Autowired
+    private PushHiveConfigTaskListener pushHiveConfigTaskListener;
+    @Autowired
+    private ZkSortEventSelector zkSortEventSelector;
+
+    @PostConstruct
+    public void init() {
+        sourceOperateListeners = new LinkedHashMap<>();
+        storageOperateListeners = new LinkedHashMap<>();
+        storageOperateListeners.put(createHiveTableListener, createHiveTableEventSelector);
+        queueOperateListeners = new LinkedHashMap<>();
+        queueOperateListeners.put(createTubeTopicTaskListener, new TubeEventSelector());
+        queueOperateListeners.put(createTubeGroupTaskListener, new TubeEventSelector());
+        queueOperateListeners.put(createPulsarResourceTaskListener, new PulsarEventSelector());
+        queueOperateListeners.put(createPulsarGroupTaskListener, new PulsarEventSelector());
+        sortOperateListeners = new LinkedHashMap<>();
+        sortOperateListeners.put(pushHiveConfigTaskListener, zkSortEventSelector);
+    }
+
+    public List<DataSourceOperateListener> getDSOperateListener(WorkflowContext context) {
+        List<DataSourceOperateListener> listeners = new ArrayList<>();
+        for (Map.Entry<DataSourceOperateListener, EventSelector> entry : sourceOperateListeners.entrySet()) {
+            EventSelector selector = entry.getValue();
+            if (selector != null && selector.accept(context)) {
+                listeners.add(entry.getKey());
+            }
+        }
+        return listeners;
+    }
+
+    public List<StorageOperateListener> getStorageOperateListener(WorkflowContext context) {
+        List<StorageOperateListener> listeners = new ArrayList<>();
+        for (Map.Entry<StorageOperateListener, EventSelector> entry : storageOperateListeners.entrySet()) {
+            EventSelector selector = entry.getValue();
+            if (selector != null && selector.accept(context)) {
+                listeners.add(entry.getKey());
+            }
+        }
+        return listeners;
+    }
+
+    public List<QueueOperateListener> getQueueOperateListener(WorkflowContext context) {
+        List<QueueOperateListener> listeners = new ArrayList<>();
+        for (Map.Entry<QueueOperateListener, EventSelector> entry : queueOperateListeners.entrySet()) {
+            EventSelector selector = entry.getValue();
+            if (selector != null && selector.accept(context)) {
+                listeners.add(entry.getKey());
+            }
+        }
+        return listeners;
+    }
+
+    public List<SortOperateListener> getSortOperateListener(WorkflowContext context) {
+        List<SortOperateListener> listeners = new ArrayList<>();
+        for (Map.Entry<SortOperateListener, EventSelector> entry : sortOperateListeners.entrySet()) {
+            EventSelector selector = entry.getValue();
+            if (selector != null && selector.accept(context)) {
+                listeners.add(entry.getKey());
+            }
+        }
+        return listeners;
+    }
+
+    @Override
+    public void acceptPlugin(Plugin plugin) {
+        if (!(plugin instanceof ProcessPlugin)) {
+            return;
+        }
+        ProcessPlugin processPlugin = (ProcessPlugin) plugin;
+        Map<DataSourceOperateListener, EventSelector> pluginDsOperateListeners =
+                processPlugin.createSourceOperateListeners();
+        if (MapUtils.isNotEmpty(pluginDsOperateListeners)) {
+            sourceOperateListeners.putAll(processPlugin.createSourceOperateListeners());
+        }
+        Map<StorageOperateListener, EventSelector> pluginStorageOperateListeners =
+                processPlugin.createStorageOperateListeners();
+        if (MapUtils.isNotEmpty(pluginStorageOperateListeners)) {
+            storageOperateListeners.putAll(pluginStorageOperateListeners);
+        }
+        Map<QueueOperateListener, EventSelector> pluginQueueOperateListeners =
+                processPlugin.createQueueOperateListeners();
+        if (MapUtils.isNotEmpty(pluginQueueOperateListeners)) {
+            queueOperateListeners.putAll(pluginQueueOperateListeners);
+        }
+        Map<SortOperateListener, EventSelector> pluginSortOperateListeners =
+                processPlugin.createSortOperateListeners();
+        if (MapUtils.isNotEmpty(pluginSortOperateListeners)) {
+            sortOperateListeners.putAll(pluginSortOperateListeners);
+        }
+    }
+}
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/BaseConfig.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/BaseConfig.java
similarity index 95%
copy from inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/BaseConfig.java
copy to inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/BaseConfig.java
index 3a480eb..653bdb5 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/BaseConfig.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/BaseConfig.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.service.core;
+package org.apache.inlong.manager.service;
 
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/BaseTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/BaseTest.java
similarity index 97%
rename from inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/BaseTest.java
rename to inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/BaseTest.java
index 06b0c18..12c33ab 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/BaseTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/BaseTest.java
@@ -1,4 +1,3 @@
-package org.apache.inlong.manager.service.core;
 /*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements. See the NOTICE file distributed with
@@ -16,6 +15,8 @@ package org.apache.inlong.manager.service.core;
  * limitations under the License.
  */
 
+package org.apache.inlong.manager.service;
+
 import org.junit.runner.RunWith;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/BaseConfig.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/plugin/JarHellTest.java
similarity index 60%
rename from inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/BaseConfig.java
rename to inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/plugin/JarHellTest.java
index 3a480eb..83742c8 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/BaseConfig.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/plugin/JarHellTest.java
@@ -15,17 +15,23 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.service.core;
+package org.apache.inlong.manager.service.core.plugin;
 
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.web.client.RestTemplate;
+import org.junit.Assert;
+import org.junit.Test;
 
-@Configuration
-public class BaseConfig {
-    @Bean
-    public RestTemplate restTemplate() {
-        return new RestTemplate();
+public class JarHellTest {
+
+    @Test
+    public void testJavaVersion() {
+        JarHell.checkJavaVersion("test_java", "1.8");
+        try {
+            JarHell.checkJavaVersion("test_java", "1.81");
+        } catch (Exception e) {
+            Assert.assertTrue(e instanceof IllegalArgumentException);
+            String msg = e.getMessage();
+            Assert.assertTrue(msg.contains("requires Java 1.8"));
+        }
     }
 
 }
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/plugin/PluginClassLoaderTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/plugin/PluginClassLoaderTest.java
new file mode 100644
index 0000000..4bf8987
--- /dev/null
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/plugin/PluginClassLoaderTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.core.plugin;
+
+import com.google.common.collect.Lists;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.plugin.Plugin;
+import org.apache.inlong.manager.common.plugin.PluginDefinition;
+import org.apache.inlong.manager.common.plugin.ProcessPlugin;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class PluginClassLoaderTest {
+
+    @Test
+    public void testLoadPlugin() {
+        String path = this.getClass().getClassLoader().getResource("").getPath();
+        PluginClassLoader pluginClassLoader = PluginClassLoader.getFromPluginUrl(path + "plugins",
+                Thread.currentThread()
+                        .getContextClassLoader());
+        Map<String, PluginDefinition> pluginDefinitionMap = pluginClassLoader.getPluginDefinitions();
+        Assert.assertTrue(pluginDefinitionMap.size() == 1);
+        PluginDefinition pluginDefinition = Lists.newArrayList(pluginDefinitionMap.values()).get(0);
+        Assert.assertTrue(pluginDefinition != null);
+        String pluginClass = pluginDefinition.getPluginClass();
+        Assert.assertTrue(StringUtils.isNotEmpty(pluginClass));
+        try {
+            Class cls = pluginClassLoader.loadClass(pluginClass);
+            Plugin plugin = (Plugin) cls.getDeclaredConstructor().newInstance();
+            Assert.assertTrue(plugin instanceof ProcessPlugin);
+        } catch (ClassNotFoundException
+                | NoSuchMethodException
+                | InstantiationException
+                | IllegalAccessException
+                | InvocationTargetException e) {
+            Assert.assertTrue(e instanceof ClassNotFoundException);
+            Assert.fail();
+        }
+    }
+
+}
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/business/CreateBusinessWorkflowDefinitionTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/plugin/PluginServiceTest.java
similarity index 56%
copy from inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/business/CreateBusinessWorkflowDefinitionTest.java
copy to inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/plugin/PluginServiceTest.java
index ea3de03..3ce65fc 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/business/CreateBusinessWorkflowDefinitionTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/plugin/PluginServiceTest.java
@@ -15,25 +15,28 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.service.workflow.business;
+package org.apache.inlong.manager.service.core.plugin;
 
-import org.apache.inlong.manager.service.core.BaseTest;
+import java.util.List;
+import org.apache.inlong.manager.common.plugin.Plugin;
+import org.apache.inlong.manager.common.plugin.ProcessPlugin;
+import org.apache.inlong.manager.service.BaseTest;
 import org.junit.Assert;
-import org.apache.inlong.manager.common.model.definition.Process;
 import org.junit.Test;
 import org.springframework.beans.factory.annotation.Autowired;
 
-public class CreateBusinessWorkflowDefinitionTest extends BaseTest {
+public class PluginServiceTest extends BaseTest {
 
     @Autowired
-    CreateBusinessWorkflowDefinition createBusinessWorkflowDefinition;
+    PluginService pluginService;
 
     @Test
-    public void testDefineProcess() {
-        Process process = createBusinessWorkflowDefinition.defineProcess();
-        Assert.assertTrue("Business Resource Creation".equals(process.getType()));
-        Assert.assertTrue(process.getTaskByName("createHiveTableTask") != null);
-        Assert.assertTrue(process.getNameToTaskMap().size() == 6);
+    public void testReloadPlugin() {
+        String path = this.getClass().getClassLoader().getResource("").getPath();
+        pluginService.setPluginLoc(path + "plugins");
+        pluginService.pluginReload();
+        List<Plugin> pluginList = pluginService.getPlugins();
+        Assert.assertTrue(pluginList.size() == 1);
+        Assert.assertTrue(pluginList.get(0) instanceof ProcessPlugin);
     }
-
 }
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/thirdpart/hive/CreateHiveTableEventSelectorTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/thirdpart/hive/CreateHiveTableEventSelectorTest.java
new file mode 100644
index 0000000..3b506af
--- /dev/null
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/thirdpart/hive/CreateHiveTableEventSelectorTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.thirdpart.hive;
+
+import org.apache.inlong.manager.common.model.WorkflowContext;
+import org.apache.inlong.manager.common.pojo.business.BusinessInfo;
+import org.apache.inlong.manager.service.BaseTest;
+import org.apache.inlong.manager.service.workflow.business.BusinessResourceWorkflowForm;
+import org.junit.Assert;
+import org.junit.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+
+public class CreateHiveTableEventSelectorTest extends BaseTest {
+
+    @Autowired
+    CreateHiveTableEventSelector createHiveTableEventSelector;
+
+    @Test
+    public void testAccept() {
+        WorkflowContext workflowContext = new WorkflowContext();
+        BusinessResourceWorkflowForm processForm = new BusinessResourceWorkflowForm();
+        workflowContext.setProcessForm(processForm);
+        Assert.assertFalse(createHiveTableEventSelector.accept(workflowContext));
+        processForm.setBusinessInfo(new BusinessInfo());
+        Assert.assertFalse(createHiveTableEventSelector.accept(workflowContext));
+        processForm.getBusinessInfo().setInlongGroupId("test");
+        Assert.assertTrue(createHiveTableEventSelector.accept(workflowContext));
+    }
+
+}
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/TaskEventListenerFactoryTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/TaskEventListenerFactoryTest.java
new file mode 100644
index 0000000..81a08da
--- /dev/null
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/TaskEventListenerFactoryTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.workflow;
+
+import java.util.List;
+import org.apache.inlong.manager.common.enums.BizConstant;
+import org.apache.inlong.manager.common.event.task.QueueOperateListener;
+import org.apache.inlong.manager.common.model.WorkflowContext;
+import org.apache.inlong.manager.common.pojo.business.BusinessInfo;
+import org.apache.inlong.manager.service.BaseTest;
+import org.apache.inlong.manager.service.thirdpart.mq.CreatePulsarGroupTaskListener;
+import org.apache.inlong.manager.service.thirdpart.mq.CreatePulsarResourceTaskListener;
+import org.apache.inlong.manager.service.thirdpart.mq.CreateTubeGroupTaskListener;
+import org.apache.inlong.manager.service.thirdpart.mq.CreateTubeTopicTaskListener;
+import org.apache.inlong.manager.service.workflow.business.BusinessResourceWorkflowForm;
+import org.junit.Assert;
+import org.junit.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+
+public class TaskEventListenerFactoryTest extends BaseTest {
+
+    @Autowired
+    TaskEventListenerFactory taskEventListenerFactory;
+
+    @Test
+    public void testGetQueueOperateListener() {
+        WorkflowContext context = new WorkflowContext();
+        BusinessResourceWorkflowForm processForm = new BusinessResourceWorkflowForm();
+        BusinessInfo businessInfo = new BusinessInfo();
+        //check pulsar listener
+        businessInfo.setMiddlewareType(BizConstant.MIDDLEWARE_PULSAR);
+        processForm.setBusinessInfo(businessInfo);
+        context.setProcessForm(processForm);
+        List<QueueOperateListener> queueOperateListeners = taskEventListenerFactory.getQueueOperateListener(context);
+        Assert.assertTrue(queueOperateListeners.size() == 2);
+        Assert.assertTrue(queueOperateListeners.get(0) instanceof CreatePulsarResourceTaskListener);
+        Assert.assertTrue(queueOperateListeners.get(1) instanceof CreatePulsarGroupTaskListener);
+        //check tube listener
+        businessInfo.setMiddlewareType(BizConstant.MIDDLEWARE_TUBE);
+        queueOperateListeners = taskEventListenerFactory.getQueueOperateListener(context);
+        Assert.assertTrue(queueOperateListeners.size() == 2);
+        Assert.assertTrue(queueOperateListeners.get(0) instanceof CreateTubeTopicTaskListener);
+        Assert.assertTrue(queueOperateListeners.get(1) instanceof CreateTubeGroupTaskListener);
+    }
+
+}
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/business/CreateBusinessWorkflowDefinitionTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/business/CreateBusinessWorkflowDefinitionTest.java
index ea3de03..754da1d 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/business/CreateBusinessWorkflowDefinitionTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/business/CreateBusinessWorkflowDefinitionTest.java
@@ -17,9 +17,9 @@
 
 package org.apache.inlong.manager.service.workflow.business;
 
-import org.apache.inlong.manager.service.core.BaseTest;
-import org.junit.Assert;
 import org.apache.inlong.manager.common.model.definition.Process;
+import org.apache.inlong.manager.service.BaseTest;
+import org.junit.Assert;
 import org.junit.Test;
 import org.springframework.beans.factory.annotation.Autowired;
 
diff --git a/inlong-manager/manager-service/src/test/resources/application-test.properties b/inlong-manager/manager-service/src/test/resources/application-test.properties
index 5e56f81..7603e24 100644
--- a/inlong-manager/manager-service/src/test/resources/application-test.properties
+++ b/inlong-manager/manager-service/src/test/resources/application-test.properties
@@ -19,10 +19,12 @@
 # Log level
 logging.level.root=INFO
 logging.level.org.apache.inlong.manager=debug
-spring.datasource.druid.url=jdbc:mysql://127.0.0.1:3306/apache_inlong_manager?useSSL=false&allowPublicKeyRetrieval=true&characterEncoding=UTF-8&nullCatalogMeansCurrent=true&serverTimezone=GMT%2b8
+spring.datasource.druid.url=jdbc:h2:mem:test;MODE=MYSQL;DB_CLOSE_DELAY=-1;IGNORECASE=TRUE;
 spring.datasource.druid.username=root
-spring.datasource.druid.password=inlong
-spring.datasource.druid.driver-class-name=com.mysql.cj.jdbc.Driver
+spring.datasource.druid.password=""
+spring.datasource.druid.driver-class-name=org.h2.Driver
+spring.datasource.schema=classpath:sql/apache_inlong_manager.sql
+spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
 spring.datasource.druid.validationQuery=SELECT 'x'
 # Initialization size, minimum, maximum
 spring.datasource.druid.initialSize=20
diff --git a/inlong-manager/manager-service/src/test/resources/plugins/inlong-manager-plugin.jar b/inlong-manager/manager-service/src/test/resources/plugins/inlong-manager-plugin.jar
new file mode 100644
index 0000000..2e52e83
Binary files /dev/null and b/inlong-manager/manager-service/src/test/resources/plugins/inlong-manager-plugin.jar differ
diff --git a/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/web/controller/openapi/SortControllerTest.java b/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/web/controller/openapi/SortControllerTest.java
index 2e4e1b3..dddd183 100644
--- a/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/web/controller/openapi/SortControllerTest.java
+++ b/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/web/controller/openapi/SortControllerTest.java
@@ -17,7 +17,11 @@
 
 package org.apache.inlong.manager.web.controller.openapi;
 
-import org.apache.inlong.manager.dao.entity.SortClusterConfgiEntity;
+import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
+import static org.springframework.test.web.servlet.result.MockMvcResultHandlers.print;
+import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
+
+import org.apache.inlong.manager.dao.entity.SortClusterConfigEntity;
 import org.apache.inlong.manager.dao.entity.SortTaskIdParamEntity;
 import org.apache.inlong.manager.dao.mapper.SortClusterConfgiEntityMapper;
 import org.apache.inlong.manager.dao.mapper.SortTaskIdParamEntityMapper;
@@ -33,10 +37,6 @@ import org.springframework.test.web.servlet.setup.MockMvcBuilders;
 import org.springframework.transaction.annotation.Transactional;
 import org.springframework.web.context.WebApplicationContext;
 
-import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
-import static org.springframework.test.web.servlet.result.MockMvcResultHandlers.print;
-import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
-
 @RunWith(SpringRunner.class)
 @SpringBootTest
 public class SortControllerTest {
@@ -107,8 +107,8 @@ public class SortControllerTest {
                 .build();
     }
 
-    private SortClusterConfgiEntity prepareClusterConfigEntity(String taskName, String sinkType) {
-        return SortClusterConfgiEntity.builder()
+    private SortClusterConfigEntity prepareClusterConfigEntity(String taskName, String sinkType) {
+        return SortClusterConfigEntity.builder()
                 .clusterName("testCluster")
                 .taskName(taskName)
                 .sinkType(sinkType)
diff --git a/inlong-manager/pom.xml b/inlong-manager/pom.xml
index 955f80a..58945cc 100644
--- a/inlong-manager/pom.xml
+++ b/inlong-manager/pom.xml
@@ -264,7 +264,11 @@
                 <artifactId>jackson-datatype-jsr310</artifactId>
                 <version>${jackson.jsr310.version}</version>
             </dependency>
-
+            <dependency>
+                <groupId>com.fasterxml.jackson.dataformat</groupId>
+                <artifactId>jackson-dataformat-yaml</artifactId>
+                <version>${jackson.version}</version>
+            </dependency>
             <dependency>
                 <groupId>io.swagger</groupId>
                 <artifactId>swagger-annotations</artifactId>