You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/01/26 12:51:28 UTC
[incubator-inlong] branch master updated: [INLONG-2317] Support Plugin in Inlong Manager (#2319)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 6bc4b37 [INLONG-2317] Support Plugin in Inlong Manager (#2319)
6bc4b37 is described below
commit 6bc4b379611ba0af7fefd19ee137f44aa6f2b47d
Author: kipshi <48...@users.noreply.github.com>
AuthorDate: Wed Jan 26 20:51:23 2022 +0800
[INLONG-2317] Support Plugin in Inlong Manager (#2319)
---
.../manager/common/event/EventSelector.java} | 18 +--
.../event/task/DataSourceOperateListener.java} | 33 ++--
.../common/event/task/QueueOperateListener.java} | 33 ++--
.../common/event/task/SortOperateListener.java} | 32 ++--
.../common/event/task/StorageOperateListener.java} | 32 ++--
.../inlong/manager/common/plugin/Plugin.java} | 13 +-
.../manager/common/plugin/PluginBinder.java} | 13 +-
.../manager/common/plugin/PluginDefinition.java} | 37 +++--
.../manager/common/plugin/ProcessPlugin.java | 45 ++++++
...fgiEntity.java => SortClusterConfigEntity.java} | 2 +-
.../dao/mapper/SortClusterConfgiEntityMapper.java | 14 +-
.../mappers/SortClusterConfgiEntityMapper.xml | 10 +-
inlong-manager/manager-plugin-examples/pom.xml | 56 +++++++
.../inlong/manager/plugin/EmptyProcessPlugin.java | 50 ++++++
.../src/main/resources/META-INF/plugin.yaml | 23 +++
inlong-manager/manager-service/pom.xml | 4 +
.../service/core/SortClusterConfigService.java | 4 +-
.../core/impl/SortClusterConfigServiceImpl.java | 4 +-
.../manager/service/core/impl/SortServiceImpl.java | 8 +-
.../manager/service/core/plugin/JarHell.java | 127 +++++++++++++++
.../service/core/plugin/PluginClassLoader.java | 177 +++++++++++++++++++++
.../manager/service/core/plugin/PluginService.java | 94 +++++++++++
.../hive/CreateHiveTableEventSelector.java | 69 ++++++++
.../hive/CreateHiveTableForStreamListener.java | 4 +-
.../thirdpart/hive/CreateHiveTableListener.java | 4 +-
.../mq/CreatePulsarGroupForStreamTaskListener.java | 12 +-
.../mq/CreatePulsarGroupTaskListener.java | 12 +-
.../mq/CreatePulsarResourceTaskListener.java | 12 +-
.../mq/CreatePulsarTopicForStreamTaskListener.java | 12 +-
.../thirdpart/mq/CreateTubeGroupTaskListener.java | 12 +-
.../thirdpart/mq/CreateTubeTopicTaskListener.java | 12 +-
.../service/thirdpart/mq/PulsarEventSelector.java | 40 +++++
.../service/thirdpart/mq/TubeEventSelector.java | 41 +++++
.../thirdpart/sort/PushHiveConfigTaskListener.java | 12 +-
.../thirdpart/sort/ZkSortEventSelector.java} | 23 +--
.../service/workflow/TaskEventListenerFactory.java | 163 +++++++++++++++++++
.../manager/service/{core => }/BaseConfig.java | 2 +-
.../manager/service/{core => }/BaseTest.java | 3 +-
.../{BaseConfig.java => plugin/JarHellTest.java} | 24 +--
.../service/core/plugin/PluginClassLoaderTest.java | 58 +++++++
.../plugin/PluginServiceTest.java} | 25 +--
.../hive/CreateHiveTableEventSelectorTest.java | 45 ++++++
.../workflow/TaskEventListenerFactoryTest.java | 61 +++++++
.../CreateBusinessWorkflowDefinitionTest.java | 4 +-
.../src/test/resources/application-test.properties | 8 +-
.../resources/plugins/inlong-manager-plugin.jar | Bin 0 -> 12277 bytes
.../web/controller/openapi/SortControllerTest.java | 14 +-
inlong-manager/pom.xml | 6 +-
48 files changed, 1302 insertions(+), 205 deletions(-)
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/BaseConfig.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/event/EventSelector.java
similarity index 69%
copy from inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/BaseConfig.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/event/EventSelector.java
index 3a480eb..a8f66a9 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/BaseConfig.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/event/EventSelector.java
@@ -15,17 +15,15 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.service.core;
+package org.apache.inlong.manager.common.event;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.web.client.RestTemplate;
+import org.apache.inlong.manager.common.model.WorkflowContext;
-@Configuration
-public class BaseConfig {
- @Bean
- public RestTemplate restTemplate() {
- return new RestTemplate();
- }
+/**
+ * a selector allowing to decide which event is selected
+ */
+public interface EventSelector {
+
+ boolean accept(WorkflowContext context);
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortClusterConfigService.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/event/task/DataSourceOperateListener.java
similarity index 53%
copy from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortClusterConfigService.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/event/task/DataSourceOperateListener.java
index 75d75ac..768453c 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortClusterConfigService.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/event/task/DataSourceOperateListener.java
@@ -15,21 +15,28 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.service.core;
+package org.apache.inlong.manager.common.event.task;
-import org.apache.inlong.manager.dao.entity.SortClusterConfgiEntity;
+import org.apache.inlong.manager.common.event.ListenerResult;
+import org.apache.inlong.manager.common.model.WorkflowContext;
-import java.util.List;
+public interface DataSourceOperateListener extends TaskEventListener {
-/**
- * Sort cluster config service.
- */
-public interface SortClusterConfigService {
+ DataSourceOperateListener DEFAULT_SOURCE_OPERATE_LISTENER = new DataSourceOperateListener() {
+ @Override
+ public TaskEvent event() {
+ return TaskEvent.COMPLETE;
+ }
+
+ @Override
+ public ListenerResult listen(WorkflowContext context) throws Exception {
+ return ListenerResult.success();
+ }
+
+ @Override
+ public boolean async() {
+ return false;
+ }
+ };
- /**
- * Select list of task by cluster name.
- * @param clusterName Name of sort cluster.
- * @return List of tasks, including task name and sink type.
- */
- List<SortClusterConfgiEntity> selectTasksByClusterName(String clusterName);
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortClusterConfigService.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/event/task/QueueOperateListener.java
similarity index 54%
copy from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortClusterConfigService.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/event/task/QueueOperateListener.java
index 75d75ac..47b46c9 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortClusterConfigService.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/event/task/QueueOperateListener.java
@@ -15,21 +15,28 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.service.core;
+package org.apache.inlong.manager.common.event.task;
-import org.apache.inlong.manager.dao.entity.SortClusterConfgiEntity;
+import org.apache.inlong.manager.common.event.ListenerResult;
+import org.apache.inlong.manager.common.model.WorkflowContext;
-import java.util.List;
+public interface QueueOperateListener extends TaskEventListener {
-/**
- * Sort cluster config service.
- */
-public interface SortClusterConfigService {
+ QueueOperateListener DEFAULT_QUEUE_OPERATE_LISTENER = new QueueOperateListener() {
+ @Override
+ public TaskEvent event() {
+ return TaskEvent.COMPLETE;
+ }
+
+ @Override
+ public ListenerResult listen(WorkflowContext context) throws Exception {
+ return ListenerResult.success();
+ }
+
+ @Override
+ public boolean async() {
+ return false;
+ }
+ };
- /**
- * Select list of task by cluster name.
- * @param clusterName Name of sort cluster.
- * @return List of tasks, including task name and sink type.
- */
- List<SortClusterConfgiEntity> selectTasksByClusterName(String clusterName);
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortClusterConfigService.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/event/task/SortOperateListener.java
similarity index 54%
copy from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortClusterConfigService.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/event/task/SortOperateListener.java
index 75d75ac..78d3ce4 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortClusterConfigService.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/event/task/SortOperateListener.java
@@ -15,21 +15,27 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.service.core;
+package org.apache.inlong.manager.common.event.task;
-import org.apache.inlong.manager.dao.entity.SortClusterConfgiEntity;
+import org.apache.inlong.manager.common.event.ListenerResult;
+import org.apache.inlong.manager.common.model.WorkflowContext;
-import java.util.List;
+public interface SortOperateListener extends TaskEventListener {
-/**
- * Sort cluster config service.
- */
-public interface SortClusterConfigService {
+ SortOperateListener DEFAULT_SORT_OPERATE_LISTENER = new SortOperateListener() {
+ @Override
+ public TaskEvent event() {
+ return TaskEvent.COMPLETE;
+ }
+
+ @Override
+ public ListenerResult listen(WorkflowContext context) throws Exception {
+ return ListenerResult.success();
+ }
- /**
- * Select list of task by cluster name.
- * @param clusterName Name of sort cluster.
- * @return List of tasks, including task name and sink type.
- */
- List<SortClusterConfgiEntity> selectTasksByClusterName(String clusterName);
+ @Override
+ public boolean async() {
+ return false;
+ }
+ };
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortClusterConfigService.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/event/task/StorageOperateListener.java
similarity index 53%
copy from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortClusterConfigService.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/event/task/StorageOperateListener.java
index 75d75ac..45a9eb6 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortClusterConfigService.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/event/task/StorageOperateListener.java
@@ -15,21 +15,27 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.service.core;
+package org.apache.inlong.manager.common.event.task;
-import org.apache.inlong.manager.dao.entity.SortClusterConfgiEntity;
+import org.apache.inlong.manager.common.event.ListenerResult;
+import org.apache.inlong.manager.common.model.WorkflowContext;
-import java.util.List;
+public interface StorageOperateListener extends TaskEventListener {
-/**
- * Sort cluster config service.
- */
-public interface SortClusterConfigService {
+ StorageOperateListener DEFAULT_STORAGE_OPERATE_LISTENER = new StorageOperateListener() {
+ @Override
+ public TaskEvent event() {
+ return TaskEvent.COMPLETE;
+ }
+
+ @Override
+ public ListenerResult listen(WorkflowContext context) throws Exception {
+ return ListenerResult.success();
+ }
- /**
- * Select list of task by cluster name.
- * @param clusterName Name of sort cluster.
- * @return List of tasks, including task name and sink type.
- */
- List<SortClusterConfgiEntity> selectTasksByClusterName(String clusterName);
+ @Override
+ public boolean async() {
+ return false;
+ }
+ };
}
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/BaseConfig.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/plugin/Plugin.java
similarity index 69%
copy from inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/BaseConfig.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/plugin/Plugin.java
index 3a480eb..a072c77 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/BaseConfig.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/plugin/Plugin.java
@@ -15,17 +15,8 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.service.core;
+package org.apache.inlong.manager.common.plugin;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.web.client.RestTemplate;
-
-@Configuration
-public class BaseConfig {
- @Bean
- public RestTemplate restTemplate() {
- return new RestTemplate();
- }
+public interface Plugin {
}
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/BaseConfig.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/plugin/PluginBinder.java
similarity index 69%
copy from inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/BaseConfig.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/plugin/PluginBinder.java
index 3a480eb..2fff722 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/BaseConfig.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/plugin/PluginBinder.java
@@ -15,17 +15,10 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.service.core;
+package org.apache.inlong.manager.common.plugin;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.web.client.RestTemplate;
+public interface PluginBinder {
-@Configuration
-public class BaseConfig {
- @Bean
- public RestTemplate restTemplate() {
- return new RestTemplate();
- }
+ void acceptPlugin(Plugin plugin);
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortClusterConfigService.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/plugin/PluginDefinition.java
similarity index 54%
copy from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortClusterConfigService.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/plugin/PluginDefinition.java
index 75d75ac..874db28 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortClusterConfigService.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/plugin/PluginDefinition.java
@@ -15,21 +15,38 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.service.core;
+package org.apache.inlong.manager.common.plugin;
-import org.apache.inlong.manager.dao.entity.SortClusterConfgiEntity;
-
-import java.util.List;
+import lombok.Data;
/**
- * Sort cluster config service.
+ * pluginDefinition should be defined in *.jar/META-INF/plugin.yaml
+ * for example:
+ * name: test
+ * description: this plugin is use for test
+ * pluginClass: org.apache.inlong.plugin.testPlugin
+ * javaVersion: 1.8 or 8
*/
-public interface SortClusterConfigService {
+@Data
+public class PluginDefinition {
+
+ /**
+ * name of plugin
+ */
+ private String name;
+
+ /**
+ * description of plugin to be used for user help
+ */
+ private String description;
+
+ /**
+ * java_version of plugin to be used for check validate
+ */
+ private String javaVersion;
/**
- * Select list of task by cluster name.
- * @param clusterName Name of sort cluster.
- * @return List of tasks, including task name and sink type.
+ * the full class name of plugin
*/
- List<SortClusterConfgiEntity> selectTasksByClusterName(String clusterName);
+ private String pluginClass;
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/plugin/ProcessPlugin.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/plugin/ProcessPlugin.java
new file mode 100644
index 0000000..9192a5f
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/plugin/ProcessPlugin.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.plugin;
+
+import java.util.Map;
+import org.apache.inlong.manager.common.event.EventSelector;
+import org.apache.inlong.manager.common.event.task.DataSourceOperateListener;
+import org.apache.inlong.manager.common.event.task.QueueOperateListener;
+import org.apache.inlong.manager.common.event.task.SortOperateListener;
+import org.apache.inlong.manager.common.event.task.StorageOperateListener;
+
+public interface ProcessPlugin extends Plugin {
+
+ default Map<DataSourceOperateListener, EventSelector> createSourceOperateListeners() {
+ return null;
+ }
+
+ default Map<StorageOperateListener, EventSelector> createStorageOperateListeners() {
+ return null;
+ }
+
+ default Map<QueueOperateListener, EventSelector> createQueueOperateListeners() {
+ return null;
+ }
+
+ default Map<SortOperateListener, EventSelector> createSortOperateListeners() {
+ return null;
+ }
+
+}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/SortClusterConfgiEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/SortClusterConfigEntity.java
similarity index 94%
rename from inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/SortClusterConfgiEntity.java
rename to inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/SortClusterConfigEntity.java
index ea2e2dd..f9f0319 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/SortClusterConfgiEntity.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/SortClusterConfigEntity.java
@@ -24,7 +24,7 @@ import java.io.Serializable;
@Data
@Builder
-public class SortClusterConfgiEntity implements Serializable {
+public class SortClusterConfigEntity implements Serializable {
private Integer id;
private String clusterName;
private String taskName;
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/SortClusterConfgiEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/SortClusterConfgiEntityMapper.java
index 567514e..0fba607 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/SortClusterConfgiEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/SortClusterConfgiEntityMapper.java
@@ -17,7 +17,7 @@
package org.apache.inlong.manager.dao.mapper;
-import org.apache.inlong.manager.dao.entity.SortClusterConfgiEntity;
+import org.apache.inlong.manager.dao.entity.SortClusterConfigEntity;
import org.springframework.stereotype.Repository;
import java.util.List;
@@ -26,16 +26,16 @@ import java.util.List;
public interface SortClusterConfgiEntityMapper {
int deleteByPrimaryKey(Integer id);
- int insert(SortClusterConfgiEntity record);
+ int insert(SortClusterConfigEntity record);
- int insertSelective(SortClusterConfgiEntity record);
+ int insertSelective(SortClusterConfigEntity record);
- SortClusterConfgiEntity selectByPrimaryKey(Integer id);
+ SortClusterConfigEntity selectByPrimaryKey(Integer id);
- int updateByPrimaryKeySelective(SortClusterConfgiEntity record);
+ int updateByPrimaryKeySelective(SortClusterConfigEntity record);
- int updateByPrimaryKey(SortClusterConfgiEntity record);
+ int updateByPrimaryKey(SortClusterConfigEntity record);
- List<SortClusterConfgiEntity> selectTasksByClusterName(String clusterName);
+ List<SortClusterConfigEntity> selectTasksByClusterName(String clusterName);
}
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/SortClusterConfgiEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/SortClusterConfgiEntityMapper.xml
index 6a2020e..4064a51 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/SortClusterConfgiEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/SortClusterConfgiEntityMapper.xml
@@ -19,7 +19,7 @@
-->
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="org.apache.inlong.manager.dao.mapper.SortClusterConfgiEntityMapper">
- <resultMap id="BaseResultMap" type="org.apache.inlong.manager.dao.entity.SortClusterConfgiEntity">
+ <resultMap id="BaseResultMap" type="org.apache.inlong.manager.dao.entity.SortClusterConfigEntity">
<id column="id" jdbcType="INTEGER" property="id" />
<result column="cluster_name" jdbcType="VARCHAR" property="clusterName" />
<result column="task_name" jdbcType="VARCHAR" property="taskName" />
@@ -38,13 +38,13 @@
delete from sort_cluster_config
where id = #{id,jdbcType=INTEGER}
</delete>
- <insert id="insert" parameterType="org.apache.inlong.manager.dao.entity.SortClusterConfgiEntity">
+ <insert id="insert" parameterType="org.apache.inlong.manager.dao.entity.SortClusterConfigEntity">
insert into sort_cluster_config (id, cluster_name, task_name,
sink_type)
values (#{id,jdbcType=INTEGER}, #{clusterName,jdbcType=VARCHAR}, #{taskName,jdbcType=VARCHAR},
#{sinkType,jdbcType=VARCHAR})
</insert>
- <insert id="insertSelective" parameterType="org.apache.inlong.manager.dao.entity.SortClusterConfgiEntity">
+ <insert id="insertSelective" parameterType="org.apache.inlong.manager.dao.entity.SortClusterConfigEntity">
insert into sort_cluster_config
<trim prefix="(" suffix=")" suffixOverrides=",">
<if test="id != null">
@@ -75,7 +75,7 @@
</if>
</trim>
</insert>
- <update id="updateByPrimaryKeySelective" parameterType="org.apache.inlong.manager.dao.entity.SortClusterConfgiEntity">
+ <update id="updateByPrimaryKeySelective" parameterType="org.apache.inlong.manager.dao.entity.SortClusterConfigEntity">
update sort_cluster_config
<set>
<if test="clusterName != null">
@@ -90,7 +90,7 @@
</set>
where id = #{id,jdbcType=INTEGER}
</update>
- <update id="updateByPrimaryKey" parameterType="org.apache.inlong.manager.dao.entity.SortClusterConfgiEntity">
+ <update id="updateByPrimaryKey" parameterType="org.apache.inlong.manager.dao.entity.SortClusterConfigEntity">
update sort_cluster_config
set cluster_name = #{clusterName,jdbcType=VARCHAR},
task_name = #{taskName,jdbcType=VARCHAR},
diff --git a/inlong-manager/manager-plugin-examples/pom.xml b/inlong-manager/manager-plugin-examples/pom.xml
new file mode 100644
index 0000000..a4e1198
--- /dev/null
+++ b/inlong-manager/manager-plugin-examples/pom.xml
@@ -0,0 +1,56 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>inlong-manager</artifactId>
+ <groupId>org.apache.inlong</groupId>
+ <version>0.13.0-incubating-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>inlong-manager-plugin</artifactId>
+
+ <properties>
+ <maven.compiler.source>8</maven.compiler.source>
+ <maven.compiler.target>8</maven.compiler.target>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>manager-common</artifactId>
+ <version>${project.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <resources>
+ <resource>
+ <directory>src/main/resources</directory>
+ <includes>
+ <include>META-INF/*</include>
+ </includes>
+ </resource>
+ </resources>
+ <finalName>${project.name}</finalName>
+ </build>
+</project>
\ No newline at end of file
diff --git a/inlong-manager/manager-plugin-examples/src/main/java/org/apache/inlong/manager/plugin/EmptyProcessPlugin.java b/inlong-manager/manager-plugin-examples/src/main/java/org/apache/inlong/manager/plugin/EmptyProcessPlugin.java
new file mode 100644
index 0000000..64d7cb1
--- /dev/null
+++ b/inlong-manager/manager-plugin-examples/src/main/java/org/apache/inlong/manager/plugin/EmptyProcessPlugin.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.plugin;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import org.apache.inlong.manager.common.event.EventSelector;
+import org.apache.inlong.manager.common.event.task.DataSourceOperateListener;
+import org.apache.inlong.manager.common.event.task.QueueOperateListener;
+import org.apache.inlong.manager.common.event.task.SortOperateListener;
+import org.apache.inlong.manager.common.event.task.StorageOperateListener;
+import org.apache.inlong.manager.common.plugin.ProcessPlugin;
+
+public class EmptyProcessPlugin implements ProcessPlugin {
+
+ @Override
+ public Map<DataSourceOperateListener, EventSelector> createSourceOperateListeners() {
+ return new LinkedHashMap<>();
+ }
+
+ @Override
+ public Map<QueueOperateListener, EventSelector> createQueueOperateListeners() {
+ return new LinkedHashMap<>();
+ }
+
+ @Override
+ public Map<SortOperateListener, EventSelector> createSortOperateListeners() {
+ return ProcessPlugin.super.createSortOperateListeners();
+ }
+
+ @Override
+ public Map<StorageOperateListener, EventSelector> createStorageOperateListeners() {
+ return ProcessPlugin.super.createStorageOperateListeners();
+ }
+}
diff --git a/inlong-manager/manager-plugin-examples/src/main/resources/META-INF/plugin.yaml b/inlong-manager/manager-plugin-examples/src/main/resources/META-INF/plugin.yaml
new file mode 100644
index 0000000..689a149
--- /dev/null
+++ b/inlong-manager/manager-plugin-examples/src/main/resources/META-INF/plugin.yaml
@@ -0,0 +1,23 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+name: example
+description: example for manager plugin
+javaVersion: 1.8
+pluginClass: org.apache.inlong.manager.plugin.EmptyProcessPlugin
diff --git a/inlong-manager/manager-service/pom.xml b/inlong-manager/manager-service/pom.xml
index 9eecbc4..fd33e31 100644
--- a/inlong-manager/manager-service/pom.xml
+++ b/inlong-manager/manager-service/pom.xml
@@ -128,6 +128,10 @@
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.dataformat</groupId>
+ <artifactId>jackson-dataformat-yaml</artifactId>
+ </dependency>
</dependencies>
</project>
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortClusterConfigService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortClusterConfigService.java
index 75d75ac..1e98709 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortClusterConfigService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortClusterConfigService.java
@@ -17,7 +17,7 @@
package org.apache.inlong.manager.service.core;
-import org.apache.inlong.manager.dao.entity.SortClusterConfgiEntity;
+import org.apache.inlong.manager.dao.entity.SortClusterConfigEntity;
import java.util.List;
@@ -31,5 +31,5 @@ public interface SortClusterConfigService {
* @param clusterName Name of sort cluster.
* @return List of tasks, including task name and sink type.
*/
- List<SortClusterConfgiEntity> selectTasksByClusterName(String clusterName);
+ List<SortClusterConfigEntity> selectTasksByClusterName(String clusterName);
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortClusterConfigServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortClusterConfigServiceImpl.java
index 2f78f64..51f9dc3 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortClusterConfigServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortClusterConfigServiceImpl.java
@@ -17,7 +17,7 @@
package org.apache.inlong.manager.service.core.impl;
-import org.apache.inlong.manager.dao.entity.SortClusterConfgiEntity;
+import org.apache.inlong.manager.dao.entity.SortClusterConfigEntity;
import org.apache.inlong.manager.dao.mapper.SortClusterConfgiEntityMapper;
import org.apache.inlong.manager.service.core.SortClusterConfigService;
import org.springframework.beans.factory.annotation.Autowired;
@@ -35,7 +35,7 @@ public class SortClusterConfigServiceImpl implements SortClusterConfigService {
private SortClusterConfgiEntityMapper sortClusterConfgiEntityMapper;
@Override
- public List<SortClusterConfgiEntity> selectTasksByClusterName(String clusterName) {
+ public List<SortClusterConfigEntity> selectTasksByClusterName(String clusterName) {
return sortClusterConfgiEntityMapper.selectTasksByClusterName(clusterName);
}
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java
index 98f905f..3bea2dc 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java
@@ -19,9 +19,9 @@ package org.apache.inlong.manager.service.core.impl;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.pojo.sort.SortClusterConfigResponse;
+import org.apache.inlong.manager.dao.entity.SortClusterConfigEntity;
import org.apache.inlong.manager.common.pojo.sort.SortClusterConfigResponse.SinkType;
import org.apache.inlong.manager.common.pojo.sort.SortClusterConfigResponse.SortTaskConfig;
-import org.apache.inlong.manager.dao.entity.SortClusterConfgiEntity;
import org.apache.inlong.manager.service.core.SortClusterConfigService;
import org.apache.inlong.manager.service.core.SortTaskIdParamService;
import org.apache.inlong.manager.service.core.SortService;
@@ -56,8 +56,8 @@ public class SortServiceImpl implements SortService {
}
// check if there is any task.
- List<SortClusterConfgiEntity> tasks =
- sortClusterConfigService.selectTasksByClusterName(clusterName);
+ List<SortClusterConfigEntity> tasks = sortClusterConfigService.selectTasksByClusterName(clusterName);
+
if (tasks == null || tasks.isEmpty()) {
String errMsg = "There is not any task for cluster" + clusterName;
LOGGER.info(errMsg);
@@ -77,7 +77,7 @@ public class SortServiceImpl implements SortService {
return SortClusterConfigResponse.builder().tasks(taskConfigs).msg("success").build();
}
- private SortTaskConfig getTaskConfig(SortClusterConfgiEntity clusterConfig) {
+ private SortTaskConfig getTaskConfig(SortClusterConfigEntity clusterConfig) {
List<Map<String, String>> idParams =
sortTaskIdParamService.selectByTaskName(clusterConfig.getTaskName());
// TODO add method that get sink params
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/plugin/JarHell.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/plugin/JarHell.java
new file mode 100644
index 0000000..5dd5520
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/plugin/JarHell.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.core.plugin;
+
+import com.google.common.collect.Lists;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import org.apache.commons.lang3.StringUtils;
+
+public class JarHell {
+
+ public static final JavaVersion CURRENT_VERSION = new JavaVersion(Lists.newArrayList(1, 8), null);
+
+ public static class JavaVersion {
+
+ private final List<Integer> version;
+ private final String prePart;
+
+ public List<Integer> getVersion() {
+ return version;
+ }
+
+ private JavaVersion(List<Integer> version, String prePart) {
+ this.prePart = prePart;
+ if (version.size() >= 2 && version.get(0) == 1 && version.get(1) == 8) {
+ // for Java 8 there is ambiguity since both 1.8 and 8 are supported,
+ version = new ArrayList<>(version.subList(1, version.size()));
+ }
+ this.version = Collections.unmodifiableList(version);
+ }
+
+ @Override
+ public String toString() {
+ return StringUtils.join(version, '.') + "-" + prePart;
+ }
+
+ public int compareTo(JavaVersion o) {
+ int len = Math.max(version.size(), o.version.size());
+ for (int i = 0; i < len; i++) {
+ int d = (i < version.size() ? version.get(i) : 0);
+ int s = (i < o.version.size() ? o.version.get(i) : 0);
+ if (s < d) {
+ return 1;
+ }
+ if (s > d) {
+ return -1;
+ }
+ }
+ if (prePart != null && o.prePart == null) {
+ return -1;
+ } else if (prePart == null && o.prePart != null) {
+ return 1;
+ } else if (prePart != null && o.prePart != null) {
+ return comparePrePart(prePart, o.prePart);
+ }
+ return 0;
+ }
+
+ private int comparePrePart(String prePart, String otherPrePart) {
+ if (prePart.matches("\\d+")) {
+ return otherPrePart.matches("\\d+")
+ ? (new BigInteger(prePart)).compareTo(new BigInteger(otherPrePart)) : -1;
+ } else {
+ return otherPrePart.matches("\\d+")
+ ? 1 : prePart.compareTo(otherPrePart);
+ }
+ }
+ }
+
+ private JarHell() {
+ }
+
+ public static void checkJavaVersion(String resource, String javaVersion) {
+ JavaVersion version = parse(javaVersion);
+ if (CURRENT_VERSION.compareTo(version) < 0) {
+ throw new IllegalArgumentException(
+ String.format("%s requires Java %s:, your system: %s", resource, CURRENT_VERSION,
+ javaVersion));
+ }
+ }
+
+ public static JavaVersion parse(String value) {
+ Objects.requireNonNull(value);
+ String prePart = null;
+ if (!isValid(value)) {
+ throw new IllegalArgumentException("Java version string [" + value + "] could not be parsed.");
+ }
+ List<Integer> version = new ArrayList<>();
+ String[] parts = value.split("-");
+ String[] numericComponents;
+ if (parts.length == 1) {
+ numericComponents = value.split("\\.");
+ } else if (parts.length == 2) {
+ numericComponents = parts[0].split("\\.");
+ prePart = parts[1];
+ } else {
+ throw new IllegalArgumentException("Java version string [" + value + "] could not be parsed.");
+ }
+
+ for (String component : numericComponents) {
+ version.add(Integer.valueOf(component));
+ }
+ return new JavaVersion(version, prePart);
+ }
+
+ public static boolean isValid(String value) {
+ return value.matches("^0*[0-9]+(\\.[0-9]+)*(-[a-zA-Z0-9]+)?$");
+ }
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/plugin/PluginClassLoader.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/plugin/PluginClassLoader.java
new file mode 100644
index 0000000..2071f48
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/plugin/PluginClassLoader.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.core.plugin;
+
+import com.fasterxml.jackson.annotation.JsonInclude.Include;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.jar.JarEntry;
+import java.util.jar.JarFile;
+import java.util.stream.Collectors;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.plugin.PluginDefinition;
+
+@Slf4j
+public class PluginClassLoader extends URLClassLoader {
+
+ public static final String PLUGIN_PATH = "META-INF/plugin.yaml";
+
+ /**
+ * plugin.yaml should less than 1k
+ */
+ public static final int PLUGIN_DEF_CAPACITY = 1024;
+
+ /**
+ * pluginName -> pluginDefinition
+ */
+ private Map<String, PluginDefinition> pluginDefinitionMap = new HashMap<>();
+
+ private File pluginDirectory;
+
+ private ObjectMapper yamlMapper;
+
+ private PluginClassLoader(URL url, ClassLoader parent) throws IOException {
+ super(new URL[]{url}, parent);
+ this.pluginDirectory = new File(url.getPath());
+ initYamlMapper();
+ loadPluginDefinition();
+ }
+
+ public static PluginClassLoader getFromPluginUrl(String url, ClassLoader parent) {
+ checkClassLoader(parent);
+ checkUrl(url);
+ return AccessController.doPrivileged(new PrivilegedAction<PluginClassLoader>() {
+ @SneakyThrows
+ @Override
+ public PluginClassLoader run() {
+ return new PluginClassLoader(new URL("file://" + url), parent);
+ }
+ });
+ }
+
+ public Map<String, PluginDefinition> getPluginDefinitions() {
+ return this.pluginDefinitionMap;
+ }
+
+ private void initYamlMapper() {
+ ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
+ mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+ mapper.configure(DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL, true);
+ mapper.setSerializationInclusion(Include.NON_NULL);
+ this.yamlMapper = mapper;
+ }
+
+ /**
+ * load pluginDefinition in **.jar/META-INF/plugin.yaml
+ */
+ private void loadPluginDefinition() throws IOException {
+ List<PluginDefinition> definitions = new ArrayList();
+ for (File jarFile : pluginDirectory.listFiles()) {
+ if (!jarFile.getName().endsWith(".jar")) {
+ log.warn("{}' is not plugin jar , please check", jarFile);
+ continue;
+ }
+ JarFile pluginJar = new JarFile(jarFile);
+ String pluginDef = readPluginDef(pluginJar);
+ pluginDef = pluginDef.replaceAll("[\\x00]+","");
+ PluginDefinition definition = yamlMapper.readValue(pluginDef, PluginDefinition.class);
+ addURL(new URL("file://" + jarFile.getAbsolutePath()));
+ checkPluginValid(jarFile, definition);
+ definitions.add(definition);
+ }
+ pluginDefinitionMap = definitions.stream()
+ .collect(Collectors.toMap(definition -> definition.getName(), definition -> definition));
+ }
+
+ private void checkPluginValid(File jarFile, PluginDefinition pluginDefinition) {
+ if (StringUtils.isEmpty(pluginDefinition.getName())) {
+ throw new RuntimeException(String.format("%s should define pluginName in plugin.yaml", jarFile.getName()));
+ }
+ if (StringUtils.isEmpty(pluginDefinition.getPluginClass())) {
+ throw new RuntimeException(String.format("%s should define pluginClass in plugin.yaml", jarFile.getName()));
+ }
+ try {
+ this.loadClass(pluginDefinition.getPluginClass());
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException(
+ String.format("pluginClass '%s' could not be found in %s", pluginDefinition.getPluginClass(),
+ jarFile.getName()));
+ }
+ if (StringUtils.isEmpty(pluginDefinition.getDescription())) {
+ log.warn(String.format("%s should define description in plugin.yaml", jarFile.getName()));
+ }
+ if (StringUtils.isEmpty(pluginDefinition.getJavaVersion())) {
+ throw new RuntimeException(String.format("%s should define javaVersion in plugin.yaml", jarFile.getName()));
+ }
+ JarHell.checkJavaVersion(pluginDefinition.getName(), pluginDefinition.getJavaVersion());
+ }
+
+ private String readPluginDef(JarFile jar) throws IOException {
+ JarEntry entry = jar.getJarEntry(PLUGIN_PATH);
+ if (entry == null) {
+ throw new RuntimeException(String.format("%s is not found in jar '%s'", PLUGIN_PATH, jar.getName()));
+ }
+ ByteBuffer buffer = ByteBuffer.allocate(PLUGIN_DEF_CAPACITY);
+ int bt;
+ try (InputStream is = jar.getInputStream(entry)) {
+ while ((bt = is.read()) != -1) {
+ buffer.put((byte) bt);
+ }
+ }
+ return new String(buffer.array(), StandardCharsets.UTF_8);
+ }
+
+ private static void checkClassLoader(ClassLoader classLoader) {
+ if (classLoader == null) {
+ throw new RuntimeException("parent classLoader should not be null");
+ }
+ }
+
+ private static void checkUrl(String url) {
+ if (StringUtils.isBlank(url)) {
+ throw new IllegalArgumentException("url should not be empty");
+ }
+ File pluginDirectory = new File(url);
+ if (!pluginDirectory.exists()) {
+ throw new RuntimeException(String.format("pluginDirectory '%s' is not exists", pluginDirectory));
+ }
+ if (!pluginDirectory.isDirectory()) {
+ throw new RuntimeException(String.format("pluginDirectory '%s' should be directory", pluginDirectory));
+ }
+ if (!pluginDirectory.canRead()) {
+ throw new RuntimeException(String.format("pluginDirectory '%s' is not readable", pluginDirectory));
+ }
+ }
+
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/plugin/PluginService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/plugin/PluginService.java
new file mode 100644
index 0000000..7c364e2
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/plugin/PluginService.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.core.plugin;
+
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.inlong.manager.common.plugin.Plugin;
+import org.apache.inlong.manager.common.plugin.PluginBinder;
+import org.apache.inlong.manager.common.plugin.PluginDefinition;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Service;
+
+@Service
+@Slf4j
+public class PluginService {
+
+ public static final String DEFAULT_PLUGIN_LOC = "/plugins";
+
+ @Setter
+ @Getter
+ @Value("${plugin.location?:''}")
+ private String pluginLoc;
+
+ @Getter
+ @Autowired
+ private List<PluginBinder> pluginBinders;
+
+ @Getter
+ private List<Plugin> plugins = new ArrayList<>();
+
+ public PluginService() {
+ if (StringUtils.isBlank(pluginLoc)) {
+ pluginLoc = DEFAULT_PLUGIN_LOC;
+ }
+ pluginReload();
+ }
+
+ public void pluginReload() {
+ Path path = Paths.get(pluginLoc).toAbsolutePath();
+ log.info("search for plugin in {}", pluginLoc);
+ if (!path.toFile().exists()) {
+ log.warn("plugin directory not found");
+ return;
+ }
+ PluginClassLoader pluginLoader = PluginClassLoader.getFromPluginUrl(path.toString(),
+ Thread.currentThread().getContextClassLoader());
+ Map<String, PluginDefinition> pluginDefinitions = pluginLoader.getPluginDefinitions();
+ if (MapUtils.isEmpty(pluginDefinitions)) {
+ log.warn("pluginDefinition not found in {}", pluginLoc);
+ return;
+ }
+ List<Plugin> plugins = new ArrayList<>();
+ for (PluginDefinition pluginDefinition : pluginDefinitions.values()) {
+ String pluginClassName = pluginDefinition.getPluginClass();
+ try {
+ Class pluginClass = pluginLoader.loadClass(pluginClassName);
+ Object plugin = pluginClass.getDeclaredConstructor().newInstance();
+ plugins.add((Plugin) plugin);
+ } catch (Throwable e) {
+ throw new RuntimeException(e.getMessage());
+ }
+ }
+ this.plugins.addAll(plugins);
+ for (PluginBinder pluginBinder : pluginBinders) {
+ for (Plugin plugin : plugins) {
+ pluginBinder.acceptPlugin(plugin);
+ }
+ }
+ }
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/CreateHiveTableEventSelector.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/CreateHiveTableEventSelector.java
new file mode 100644
index 0000000..35d4cc2
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/CreateHiveTableEventSelector.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.thirdpart.hive;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.enums.BizConstant;
+import org.apache.inlong.manager.common.event.EventSelector;
+import org.apache.inlong.manager.common.model.WorkflowContext;
+import org.apache.inlong.manager.common.model.definition.ProcessForm;
+import org.apache.inlong.manager.dao.entity.DataStreamEntity;
+import org.apache.inlong.manager.dao.mapper.DataStreamEntityMapper;
+import org.apache.inlong.manager.service.core.StorageService;
+import org.apache.inlong.manager.service.workflow.business.BusinessResourceWorkflowForm;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Component
+@Slf4j
+public class CreateHiveTableEventSelector implements EventSelector {
+
+ @Autowired
+ private StorageService storageService;
+ @Autowired
+ private DataStreamEntityMapper streamMapper;
+
+ @Override
+ public boolean accept(WorkflowContext context) {
+ ProcessForm processForm = context.getProcessForm();
+ if (processForm == null || !(processForm instanceof BusinessResourceWorkflowForm)) {
+ return false;
+ }
+ BusinessResourceWorkflowForm form = (BusinessResourceWorkflowForm) processForm;
+ if (form.getBusinessInfo() == null || StringUtils.isEmpty(form.getBusinessInfo().getInlongGroupId())) {
+ return false;
+ }
+ String groupId = form.getInlongGroupId();
+ List<String> dsForHive = storageService.filterStreamIdByStorageType(groupId, BizConstant.STORAGE_HIVE,
+ streamMapper.selectByGroupId(groupId)
+ .stream()
+ .map(DataStreamEntity::getInlongStreamId)
+ .collect(Collectors.toList()));
+ //todo check if create hive automatically
+ if (CollectionUtils.isEmpty(dsForHive)) {
+ log.warn("groupId={} streamId={} does not have storage, skip to create hive table ",
+ groupId, form.getInlongStreamId());
+ return true;
+ }
+ return false;
+ }
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/CreateHiveTableForStreamListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/CreateHiveTableForStreamListener.java
index 1975c20..420afa2 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/CreateHiveTableForStreamListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/CreateHiveTableForStreamListener.java
@@ -20,8 +20,8 @@ package org.apache.inlong.manager.service.thirdpart.hive;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.manager.common.event.ListenerResult;
+import org.apache.inlong.manager.common.event.task.StorageOperateListener;
import org.apache.inlong.manager.common.event.task.TaskEvent;
-import org.apache.inlong.manager.common.event.task.TaskEventListener;
import org.apache.inlong.manager.common.model.WorkflowContext;
import org.apache.inlong.manager.common.pojo.datastorage.StorageHiveDTO;
import org.apache.inlong.manager.dao.mapper.StorageHiveEntityMapper;
@@ -34,7 +34,7 @@ import org.springframework.stereotype.Service;
*/
@Service
@Slf4j
-public class CreateHiveTableForStreamListener implements TaskEventListener {
+public class CreateHiveTableForStreamListener implements StorageOperateListener {
@Autowired
private StorageHiveEntityMapper hiveEntityMapper;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/CreateHiveTableListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/CreateHiveTableListener.java
index 45ac9e0..98c41c1 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/CreateHiveTableListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/CreateHiveTableListener.java
@@ -20,8 +20,8 @@ package org.apache.inlong.manager.service.thirdpart.hive;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.manager.common.event.ListenerResult;
+import org.apache.inlong.manager.common.event.task.StorageOperateListener;
import org.apache.inlong.manager.common.event.task.TaskEvent;
-import org.apache.inlong.manager.common.event.task.TaskEventListener;
import org.apache.inlong.manager.common.model.WorkflowContext;
import org.apache.inlong.manager.common.pojo.datastorage.StorageHiveDTO;
import org.apache.inlong.manager.dao.mapper.StorageHiveEntityMapper;
@@ -34,7 +34,7 @@ import org.springframework.stereotype.Service;
*/
@Service
@Slf4j
-public class CreateHiveTableListener implements TaskEventListener {
+public class CreateHiveTableListener implements StorageOperateListener {
@Autowired
private StorageHiveEntityMapper hiveEntityMapper;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreatePulsarGroupForStreamTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreatePulsarGroupForStreamTaskListener.java
index 54923dd..c92e0a7 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreatePulsarGroupForStreamTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreatePulsarGroupForStreamTaskListener.java
@@ -20,7 +20,12 @@ package org.apache.inlong.manager.service.thirdpart.mq;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.manager.common.beans.ClusterBean;
+import org.apache.inlong.manager.common.event.ListenerResult;
+import org.apache.inlong.manager.common.event.task.QueueOperateListener;
+import org.apache.inlong.manager.common.event.task.TaskEvent;
import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
+import org.apache.inlong.manager.common.model.WorkflowContext;
import org.apache.inlong.manager.common.pojo.business.BusinessInfo;
import org.apache.inlong.manager.common.pojo.pulsar.PulsarTopicBean;
import org.apache.inlong.manager.dao.entity.DataStreamEntity;
@@ -30,11 +35,6 @@ import org.apache.inlong.manager.service.core.ConsumptionService;
import org.apache.inlong.manager.service.core.StorageService;
import org.apache.inlong.manager.service.thirdpart.mq.util.PulsarUtils;
import org.apache.inlong.manager.service.workflow.business.BusinessResourceWorkflowForm;
-import org.apache.inlong.manager.common.event.ListenerResult;
-import org.apache.inlong.manager.common.event.task.TaskEvent;
-import org.apache.inlong.manager.common.event.task.TaskEventListener;
-import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
-import org.apache.inlong.manager.common.model.WorkflowContext;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@@ -44,7 +44,7 @@ import org.springframework.stereotype.Component;
*/
@Slf4j
@Component
-public class CreatePulsarGroupForStreamTaskListener implements TaskEventListener {
+public class CreatePulsarGroupForStreamTaskListener implements QueueOperateListener {
@Autowired
private ClusterBean clusterBean;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreatePulsarGroupTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreatePulsarGroupTaskListener.java
index 16f230f..ddea360 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreatePulsarGroupTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreatePulsarGroupTaskListener.java
@@ -20,6 +20,11 @@ package org.apache.inlong.manager.service.thirdpart.mq;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.manager.common.beans.ClusterBean;
+import org.apache.inlong.manager.common.event.ListenerResult;
+import org.apache.inlong.manager.common.event.task.QueueOperateListener;
+import org.apache.inlong.manager.common.event.task.TaskEvent;
+import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
+import org.apache.inlong.manager.common.model.WorkflowContext;
import org.apache.inlong.manager.common.pojo.business.BusinessInfo;
import org.apache.inlong.manager.common.pojo.pulsar.PulsarTopicBean;
import org.apache.inlong.manager.dao.entity.DataStreamEntity;
@@ -28,11 +33,6 @@ import org.apache.inlong.manager.service.core.BusinessService;
import org.apache.inlong.manager.service.core.ConsumptionService;
import org.apache.inlong.manager.service.thirdpart.mq.util.PulsarUtils;
import org.apache.inlong.manager.service.workflow.business.BusinessResourceWorkflowForm;
-import org.apache.inlong.manager.common.event.ListenerResult;
-import org.apache.inlong.manager.common.event.task.TaskEvent;
-import org.apache.inlong.manager.common.event.task.TaskEventListener;
-import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
-import org.apache.inlong.manager.common.model.WorkflowContext;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@@ -42,7 +42,7 @@ import org.springframework.stereotype.Component;
*/
@Component
@Slf4j
-public class CreatePulsarGroupTaskListener implements TaskEventListener {
+public class CreatePulsarGroupTaskListener implements QueueOperateListener {
@Autowired
private ClusterBean clusterBean;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreatePulsarResourceTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreatePulsarResourceTaskListener.java
index 65844b8..7673810 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreatePulsarResourceTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreatePulsarResourceTaskListener.java
@@ -20,6 +20,11 @@ package org.apache.inlong.manager.service.thirdpart.mq;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.manager.common.beans.ClusterBean;
+import org.apache.inlong.manager.common.event.ListenerResult;
+import org.apache.inlong.manager.common.event.task.QueueOperateListener;
+import org.apache.inlong.manager.common.event.task.TaskEvent;
+import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
+import org.apache.inlong.manager.common.model.WorkflowContext;
import org.apache.inlong.manager.common.pojo.business.BusinessInfo;
import org.apache.inlong.manager.common.pojo.datastream.DataStreamTopicVO;
import org.apache.inlong.manager.common.pojo.pulsar.PulsarTopicBean;
@@ -30,11 +35,6 @@ import org.apache.inlong.manager.dao.mapper.DataStreamEntityMapper;
import org.apache.inlong.manager.service.core.BusinessService;
import org.apache.inlong.manager.service.thirdpart.mq.util.PulsarUtils;
import org.apache.inlong.manager.service.workflow.business.BusinessResourceWorkflowForm;
-import org.apache.inlong.manager.common.event.ListenerResult;
-import org.apache.inlong.manager.common.event.task.TaskEvent;
-import org.apache.inlong.manager.common.event.task.TaskEventListener;
-import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
-import org.apache.inlong.manager.common.model.WorkflowContext;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@@ -44,7 +44,7 @@ import org.springframework.stereotype.Component;
*/
@Slf4j
@Component()
-public class CreatePulsarResourceTaskListener implements TaskEventListener {
+public class CreatePulsarResourceTaskListener implements QueueOperateListener {
@Autowired
PulsarOptService pulsarOptService;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreatePulsarTopicForStreamTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreatePulsarTopicForStreamTaskListener.java
index 0a1e72e..8eaf2e3 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreatePulsarTopicForStreamTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreatePulsarTopicForStreamTaskListener.java
@@ -20,6 +20,11 @@ package org.apache.inlong.manager.service.thirdpart.mq;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.manager.common.beans.ClusterBean;
+import org.apache.inlong.manager.common.event.ListenerResult;
+import org.apache.inlong.manager.common.event.task.QueueOperateListener;
+import org.apache.inlong.manager.common.event.task.TaskEvent;
+import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
+import org.apache.inlong.manager.common.model.WorkflowContext;
import org.apache.inlong.manager.common.pojo.business.BusinessInfo;
import org.apache.inlong.manager.common.pojo.pulsar.PulsarTopicBean;
import org.apache.inlong.manager.dao.entity.DataStreamEntity;
@@ -27,11 +32,6 @@ import org.apache.inlong.manager.dao.mapper.DataStreamEntityMapper;
import org.apache.inlong.manager.service.core.BusinessService;
import org.apache.inlong.manager.service.thirdpart.mq.util.PulsarUtils;
import org.apache.inlong.manager.service.workflow.business.BusinessResourceWorkflowForm;
-import org.apache.inlong.manager.common.event.ListenerResult;
-import org.apache.inlong.manager.common.event.task.TaskEvent;
-import org.apache.inlong.manager.common.event.task.TaskEventListener;
-import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
-import org.apache.inlong.manager.common.model.WorkflowContext;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@@ -41,7 +41,7 @@ import org.springframework.stereotype.Component;
*/
@Slf4j
@Component
-public class CreatePulsarTopicForStreamTaskListener implements TaskEventListener {
+public class CreatePulsarTopicForStreamTaskListener implements QueueOperateListener {
@Autowired
private ClusterBean clusterBean;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreateTubeGroupTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreateTubeGroupTaskListener.java
index 1911e38..19ba81b 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreateTubeGroupTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreateTubeGroupTaskListener.java
@@ -20,6 +20,11 @@ package org.apache.inlong.manager.service.thirdpart.mq;
import java.util.Collections;
import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.manager.common.beans.ReTryConfigBean;
+import org.apache.inlong.manager.common.event.ListenerResult;
+import org.apache.inlong.manager.common.event.task.QueueOperateListener;
+import org.apache.inlong.manager.common.event.task.TaskEvent;
+import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
+import org.apache.inlong.manager.common.model.WorkflowContext;
import org.apache.inlong.manager.common.pojo.business.BusinessInfo;
import org.apache.inlong.manager.common.pojo.tubemq.AddTubeConsumeGroupRequest;
import org.apache.inlong.manager.common.pojo.tubemq.AddTubeConsumeGroupRequest.GroupNameJsonSetBean;
@@ -27,18 +32,13 @@ import org.apache.inlong.manager.common.pojo.tubemq.QueryTubeTopicRequest;
import org.apache.inlong.manager.dao.mapper.ClusterInfoMapper;
import org.apache.inlong.manager.service.core.BusinessService;
import org.apache.inlong.manager.service.workflow.business.BusinessResourceWorkflowForm;
-import org.apache.inlong.manager.common.event.ListenerResult;
-import org.apache.inlong.manager.common.event.task.TaskEvent;
-import org.apache.inlong.manager.common.event.task.TaskEventListener;
-import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
-import org.apache.inlong.manager.common.model.WorkflowContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Component
@Slf4j
-public class CreateTubeGroupTaskListener implements TaskEventListener {
+public class CreateTubeGroupTaskListener implements QueueOperateListener {
@Autowired
BusinessService businessService;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreateTubeTopicTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreateTubeTopicTaskListener.java
index 02165a6..1933351 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreateTubeTopicTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreateTubeTopicTaskListener.java
@@ -19,15 +19,15 @@ package org.apache.inlong.manager.service.thirdpart.mq;
import java.util.Collections;
import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.pojo.business.BusinessInfo;
-import org.apache.inlong.manager.common.pojo.tubemq.AddTubeMqTopicRequest;
-import org.apache.inlong.manager.service.core.BusinessService;
-import org.apache.inlong.manager.service.workflow.business.BusinessResourceWorkflowForm;
import org.apache.inlong.manager.common.event.ListenerResult;
+import org.apache.inlong.manager.common.event.task.QueueOperateListener;
import org.apache.inlong.manager.common.event.task.TaskEvent;
-import org.apache.inlong.manager.common.event.task.TaskEventListener;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.common.model.WorkflowContext;
+import org.apache.inlong.manager.common.pojo.business.BusinessInfo;
+import org.apache.inlong.manager.common.pojo.tubemq.AddTubeMqTopicRequest;
+import org.apache.inlong.manager.service.core.BusinessService;
+import org.apache.inlong.manager.service.workflow.business.BusinessResourceWorkflowForm;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@@ -36,7 +36,7 @@ import org.springframework.stereotype.Component;
*/
@Component
@Slf4j
-public class CreateTubeTopicTaskListener implements TaskEventListener {
+public class CreateTubeTopicTaskListener implements QueueOperateListener {
@Autowired
private TubeMqOptService tubeMqOptService;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/PulsarEventSelector.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/PulsarEventSelector.java
new file mode 100644
index 0000000..8e1be46
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/PulsarEventSelector.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.thirdpart.mq;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.inlong.manager.common.enums.BizConstant;
+import org.apache.inlong.manager.common.event.EventSelector;
+import org.apache.inlong.manager.common.model.WorkflowContext;
+import org.apache.inlong.manager.service.workflow.business.BusinessResourceWorkflowForm;
+
+@Slf4j
+public class PulsarEventSelector implements EventSelector {
+
+ @Override
+ public boolean accept(WorkflowContext context) {
+ BusinessResourceWorkflowForm form = (BusinessResourceWorkflowForm) context.getProcessForm();
+ String middlewareType = form.getBusinessInfo().getMiddlewareType();
+ if (BizConstant.MIDDLEWARE_PULSAR.equalsIgnoreCase(middlewareType)) {
+ return true;
+ }
+ log.warn("no need to create pulsar subscription group for groupId={}, as the middlewareType={}",
+ form.getInlongGroupId(), middlewareType);
+ return false;
+ }
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/TubeEventSelector.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/TubeEventSelector.java
new file mode 100644
index 0000000..415a03a
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/TubeEventSelector.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.thirdpart.mq;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.inlong.manager.common.enums.BizConstant;
+import org.apache.inlong.manager.common.event.EventSelector;
+import org.apache.inlong.manager.common.model.WorkflowContext;
+import org.apache.inlong.manager.common.pojo.business.BusinessInfo;
+import org.apache.inlong.manager.service.workflow.business.BusinessResourceWorkflowForm;
+
+@Slf4j
+public class TubeEventSelector implements EventSelector {
+
+ @Override
+ public boolean accept(WorkflowContext context) {
+ BusinessResourceWorkflowForm form = (BusinessResourceWorkflowForm) context.getProcessForm();
+ BusinessInfo businessInfo = form.getBusinessInfo();
+ if (BizConstant.MIDDLEWARE_TUBE.equalsIgnoreCase(businessInfo.getMiddlewareType())) {
+ return true;
+ }
+ log.warn("not need to create tube resource for groupId={}, as the middleware type is {}",
+ businessInfo.getMiddlewareType(), form.getInlongGroupId());
+ return false;
+ }
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/PushHiveConfigTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/PushHiveConfigTaskListener.java
index 5b6f1a9..bae7133 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/PushHiveConfigTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/PushHiveConfigTaskListener.java
@@ -28,6 +28,11 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.beans.ClusterBean;
import org.apache.inlong.manager.common.enums.BizConstant;
import org.apache.inlong.manager.common.enums.EntityStatus;
+import org.apache.inlong.manager.common.event.ListenerResult;
+import org.apache.inlong.manager.common.event.task.SortOperateListener;
+import org.apache.inlong.manager.common.event.task.TaskEvent;
+import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
+import org.apache.inlong.manager.common.model.WorkflowContext;
import org.apache.inlong.manager.common.pojo.business.BusinessExtInfo;
import org.apache.inlong.manager.common.pojo.business.BusinessInfo;
import org.apache.inlong.manager.common.pojo.datastorage.StorageHiveDTO;
@@ -40,11 +45,6 @@ import org.apache.inlong.manager.dao.mapper.BusinessEntityMapper;
import org.apache.inlong.manager.dao.mapper.StorageHiveEntityMapper;
import org.apache.inlong.manager.dao.mapper.StorageHiveFieldEntityMapper;
import org.apache.inlong.manager.service.workflow.business.BusinessResourceWorkflowForm;
-import org.apache.inlong.manager.common.event.ListenerResult;
-import org.apache.inlong.manager.common.event.task.TaskEvent;
-import org.apache.inlong.manager.common.event.task.TaskEventListener;
-import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
-import org.apache.inlong.manager.common.model.WorkflowContext;
import org.apache.inlong.sort.ZkTools;
import org.apache.inlong.sort.formats.common.FormatInfo;
import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
@@ -64,7 +64,7 @@ import org.springframework.stereotype.Component;
@Slf4j
@Component
-public class PushHiveConfigTaskListener implements TaskEventListener {
+public class PushHiveConfigTaskListener implements SortOperateListener {
private static final Map<String, String> PARTITION_TIME_FORMAT_MAP = new HashMap<>();
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/BaseConfig.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/ZkSortEventSelector.java
similarity index 62%
copy from inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/BaseConfig.java
copy to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/ZkSortEventSelector.java
index 3a480eb..4a7670f 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/BaseConfig.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/ZkSortEventSelector.java
@@ -15,17 +15,20 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.service.core;
+package org.apache.inlong.manager.service.thirdpart.sort;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.web.client.RestTemplate;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.inlong.manager.common.event.EventSelector;
+import org.apache.inlong.manager.common.model.WorkflowContext;
+import org.springframework.stereotype.Component;
-@Configuration
-public class BaseConfig {
- @Bean
- public RestTemplate restTemplate() {
- return new RestTemplate();
- }
+@Component
+@Slf4j
+public class ZkSortEventSelector implements EventSelector {
+ @Override
+ public boolean accept(WorkflowContext context) {
+ //todo check if push sort config to zookeeper
+ return true;
+ }
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/TaskEventListenerFactory.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/TaskEventListenerFactory.java
new file mode 100644
index 0000000..810b7dd
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/TaskEventListenerFactory.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.workflow;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.PostConstruct;
+import org.apache.commons.collections.MapUtils;
+import org.apache.inlong.manager.common.event.EventSelector;
+import org.apache.inlong.manager.common.event.task.DataSourceOperateListener;
+import org.apache.inlong.manager.common.event.task.QueueOperateListener;
+import org.apache.inlong.manager.common.event.task.SortOperateListener;
+import org.apache.inlong.manager.common.event.task.StorageOperateListener;
+import org.apache.inlong.manager.common.model.WorkflowContext;
+import org.apache.inlong.manager.common.plugin.Plugin;
+import org.apache.inlong.manager.common.plugin.PluginBinder;
+import org.apache.inlong.manager.common.plugin.ProcessPlugin;
+import org.apache.inlong.manager.service.thirdpart.hive.CreateHiveTableListener;
+import org.apache.inlong.manager.service.thirdpart.hive.CreateHiveTableEventSelector;
+import org.apache.inlong.manager.service.thirdpart.mq.CreatePulsarGroupTaskListener;
+import org.apache.inlong.manager.service.thirdpart.mq.CreatePulsarResourceTaskListener;
+import org.apache.inlong.manager.service.thirdpart.mq.CreateTubeGroupTaskListener;
+import org.apache.inlong.manager.service.thirdpart.mq.CreateTubeTopicTaskListener;
+import org.apache.inlong.manager.service.thirdpart.mq.PulsarEventSelector;
+import org.apache.inlong.manager.service.thirdpart.mq.TubeEventSelector;
+import org.apache.inlong.manager.service.thirdpart.sort.PushHiveConfigTaskListener;
+import org.apache.inlong.manager.service.thirdpart.sort.ZkSortEventSelector;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Component
+public class TaskEventListenerFactory implements PluginBinder {
+
+ private Map<DataSourceOperateListener, EventSelector> sourceOperateListeners;
+
+ private Map<StorageOperateListener, EventSelector> storageOperateListeners;
+
+ private Map<QueueOperateListener, EventSelector> queueOperateListeners;
+
+ private Map<SortOperateListener, EventSelector> sortOperateListeners;
+
+ @Autowired
+ private CreateTubeTopicTaskListener createTubeTopicTaskListener;
+ @Autowired
+ private CreateTubeGroupTaskListener createTubeGroupTaskListener;
+ @Autowired
+ private CreatePulsarResourceTaskListener createPulsarResourceTaskListener;
+ @Autowired
+ private CreatePulsarGroupTaskListener createPulsarGroupTaskListener;
+
+ @Autowired
+ private CreateHiveTableListener createHiveTableListener;
+ @Autowired
+ private CreateHiveTableEventSelector createHiveTableEventSelector;
+
+ @Autowired
+ private PushHiveConfigTaskListener pushHiveConfigTaskListener;
+ @Autowired
+ private ZkSortEventSelector zkSortEventSelector;
+
+ @PostConstruct
+ public void init() {
+ sourceOperateListeners = new LinkedHashMap<>();
+ storageOperateListeners = new LinkedHashMap<>();
+ storageOperateListeners.put(createHiveTableListener, createHiveTableEventSelector);
+ queueOperateListeners = new LinkedHashMap<>();
+ queueOperateListeners.put(createTubeTopicTaskListener, new TubeEventSelector());
+ queueOperateListeners.put(createTubeGroupTaskListener, new TubeEventSelector());
+ queueOperateListeners.put(createPulsarResourceTaskListener, new PulsarEventSelector());
+ queueOperateListeners.put(createPulsarGroupTaskListener, new PulsarEventSelector());
+ sortOperateListeners = new LinkedHashMap<>();
+ sortOperateListeners.put(pushHiveConfigTaskListener, zkSortEventSelector);
+ }
+
+ public List<DataSourceOperateListener> getDSOperateListener(WorkflowContext context) {
+ List<DataSourceOperateListener> listeners = new ArrayList<>();
+ for (Map.Entry<DataSourceOperateListener, EventSelector> entry : sourceOperateListeners.entrySet()) {
+ EventSelector selector = entry.getValue();
+ if (selector != null && selector.accept(context)) {
+ listeners.add(entry.getKey());
+ }
+ }
+ return listeners;
+ }
+
+ public List<StorageOperateListener> getStorageOperateListener(WorkflowContext context) {
+ List<StorageOperateListener> listeners = new ArrayList<>();
+ for (Map.Entry<StorageOperateListener, EventSelector> entry : storageOperateListeners.entrySet()) {
+ EventSelector selector = entry.getValue();
+ if (selector != null && selector.accept(context)) {
+ listeners.add(entry.getKey());
+ }
+ }
+ return listeners;
+ }
+
+ public List<QueueOperateListener> getQueueOperateListener(WorkflowContext context) {
+ List<QueueOperateListener> listeners = new ArrayList<>();
+ for (Map.Entry<QueueOperateListener, EventSelector> entry : queueOperateListeners.entrySet()) {
+ EventSelector selector = entry.getValue();
+ if (selector != null && selector.accept(context)) {
+ listeners.add(entry.getKey());
+ }
+ }
+ return listeners;
+ }
+
+ public List<SortOperateListener> getSortOperateListener(WorkflowContext context) {
+ List<SortOperateListener> listeners = new ArrayList<>();
+ for (Map.Entry<SortOperateListener, EventSelector> entry : sortOperateListeners.entrySet()) {
+ EventSelector selector = entry.getValue();
+ if (selector != null && selector.accept(context)) {
+ listeners.add(entry.getKey());
+ }
+ }
+ return listeners;
+ }
+
+ @Override
+ public void acceptPlugin(Plugin plugin) {
+ if (!(plugin instanceof ProcessPlugin)) {
+ return;
+ }
+ ProcessPlugin processPlugin = (ProcessPlugin) plugin;
+ Map<DataSourceOperateListener, EventSelector> pluginDsOperateListeners =
+ processPlugin.createSourceOperateListeners();
+ if (MapUtils.isNotEmpty(pluginDsOperateListeners)) {
+ sourceOperateListeners.putAll(processPlugin.createSourceOperateListeners());
+ }
+ Map<StorageOperateListener, EventSelector> pluginStorageOperateListeners =
+ processPlugin.createStorageOperateListeners();
+ if (MapUtils.isNotEmpty(pluginStorageOperateListeners)) {
+ storageOperateListeners.putAll(pluginStorageOperateListeners);
+ }
+ Map<QueueOperateListener, EventSelector> pluginQueueOperateListeners =
+ processPlugin.createQueueOperateListeners();
+ if (MapUtils.isNotEmpty(pluginQueueOperateListeners)) {
+ queueOperateListeners.putAll(pluginQueueOperateListeners);
+ }
+ Map<SortOperateListener, EventSelector> pluginSortOperateListeners =
+ processPlugin.createSortOperateListeners();
+ if (MapUtils.isNotEmpty(pluginSortOperateListeners)) {
+ sortOperateListeners.putAll(pluginSortOperateListeners);
+ }
+ }
+}
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/BaseConfig.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/BaseConfig.java
similarity index 95%
copy from inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/BaseConfig.java
copy to inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/BaseConfig.java
index 3a480eb..653bdb5 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/BaseConfig.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/BaseConfig.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.service.core;
+package org.apache.inlong.manager.service;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/BaseTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/BaseTest.java
similarity index 97%
rename from inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/BaseTest.java
rename to inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/BaseTest.java
index 06b0c18..12c33ab 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/BaseTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/BaseTest.java
@@ -1,4 +1,3 @@
-package org.apache.inlong.manager.service.core;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
@@ -16,6 +15,8 @@ package org.apache.inlong.manager.service.core;
* limitations under the License.
*/
+package org.apache.inlong.manager.service;
+
import org.junit.runner.RunWith;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/BaseConfig.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/plugin/JarHellTest.java
similarity index 60%
rename from inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/BaseConfig.java
rename to inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/plugin/JarHellTest.java
index 3a480eb..83742c8 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/BaseConfig.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/plugin/JarHellTest.java
@@ -15,17 +15,23 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.service.core;
+package org.apache.inlong.manager.service.core.plugin;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.web.client.RestTemplate;
+import org.junit.Assert;
+import org.junit.Test;
-@Configuration
-public class BaseConfig {
- @Bean
- public RestTemplate restTemplate() {
- return new RestTemplate();
+public class JarHellTest {
+
+ @Test
+ public void testJavaVersion() {
+ JarHell.checkJavaVersion("test_java", "1.8");
+ try {
+ JarHell.checkJavaVersion("test_java", "1.81");
+ } catch (Exception e) {
+ Assert.assertTrue(e instanceof IllegalArgumentException);
+ String msg = e.getMessage();
+ Assert.assertTrue(msg.contains("requires Java 1.8"));
+ }
}
}
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/plugin/PluginClassLoaderTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/plugin/PluginClassLoaderTest.java
new file mode 100644
index 0000000..4bf8987
--- /dev/null
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/plugin/PluginClassLoaderTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.core.plugin;
+
+import com.google.common.collect.Lists;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.plugin.Plugin;
+import org.apache.inlong.manager.common.plugin.PluginDefinition;
+import org.apache.inlong.manager.common.plugin.ProcessPlugin;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class PluginClassLoaderTest {
+
+ @Test
+ public void testLoadPlugin() {
+ String path = this.getClass().getClassLoader().getResource("").getPath();
+ PluginClassLoader pluginClassLoader = PluginClassLoader.getFromPluginUrl(path + "plugins",
+ Thread.currentThread()
+ .getContextClassLoader());
+ Map<String, PluginDefinition> pluginDefinitionMap = pluginClassLoader.getPluginDefinitions();
+ Assert.assertTrue(pluginDefinitionMap.size() == 1);
+ PluginDefinition pluginDefinition = Lists.newArrayList(pluginDefinitionMap.values()).get(0);
+ Assert.assertTrue(pluginDefinition != null);
+ String pluginClass = pluginDefinition.getPluginClass();
+ Assert.assertTrue(StringUtils.isNotEmpty(pluginClass));
+ try {
+ Class cls = pluginClassLoader.loadClass(pluginClass);
+ Plugin plugin = (Plugin) cls.getDeclaredConstructor().newInstance();
+ Assert.assertTrue(plugin instanceof ProcessPlugin);
+ } catch (ClassNotFoundException
+ | NoSuchMethodException
+ | InstantiationException
+ | IllegalAccessException
+ | InvocationTargetException e) {
+ Assert.assertTrue(e instanceof ClassNotFoundException);
+ Assert.fail();
+ }
+ }
+
+}
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/business/CreateBusinessWorkflowDefinitionTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/plugin/PluginServiceTest.java
similarity index 56%
copy from inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/business/CreateBusinessWorkflowDefinitionTest.java
copy to inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/plugin/PluginServiceTest.java
index ea3de03..3ce65fc 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/business/CreateBusinessWorkflowDefinitionTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/plugin/PluginServiceTest.java
@@ -15,25 +15,28 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.service.workflow.business;
+package org.apache.inlong.manager.service.core.plugin;
-import org.apache.inlong.manager.service.core.BaseTest;
+import java.util.List;
+import org.apache.inlong.manager.common.plugin.Plugin;
+import org.apache.inlong.manager.common.plugin.ProcessPlugin;
+import org.apache.inlong.manager.service.BaseTest;
import org.junit.Assert;
-import org.apache.inlong.manager.common.model.definition.Process;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
-public class CreateBusinessWorkflowDefinitionTest extends BaseTest {
+public class PluginServiceTest extends BaseTest {
@Autowired
- CreateBusinessWorkflowDefinition createBusinessWorkflowDefinition;
+ PluginService pluginService;
@Test
- public void testDefineProcess() {
- Process process = createBusinessWorkflowDefinition.defineProcess();
- Assert.assertTrue("Business Resource Creation".equals(process.getType()));
- Assert.assertTrue(process.getTaskByName("createHiveTableTask") != null);
- Assert.assertTrue(process.getNameToTaskMap().size() == 6);
+ public void testReloadPlugin() {
+ String path = this.getClass().getClassLoader().getResource("").getPath();
+ pluginService.setPluginLoc(path + "plugins");
+ pluginService.pluginReload();
+ List<Plugin> pluginList = pluginService.getPlugins();
+ Assert.assertTrue(pluginList.size() == 1);
+ Assert.assertTrue(pluginList.get(0) instanceof ProcessPlugin);
}
-
}
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/thirdpart/hive/CreateHiveTableEventSelectorTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/thirdpart/hive/CreateHiveTableEventSelectorTest.java
new file mode 100644
index 0000000..3b506af
--- /dev/null
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/thirdpart/hive/CreateHiveTableEventSelectorTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.thirdpart.hive;
+
+import org.apache.inlong.manager.common.model.WorkflowContext;
+import org.apache.inlong.manager.common.pojo.business.BusinessInfo;
+import org.apache.inlong.manager.service.BaseTest;
+import org.apache.inlong.manager.service.workflow.business.BusinessResourceWorkflowForm;
+import org.junit.Assert;
+import org.junit.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+
+public class CreateHiveTableEventSelectorTest extends BaseTest {
+
+ @Autowired
+ CreateHiveTableEventSelector createHiveTableEventSelector;
+
+ @Test
+ public void testAccept() {
+ WorkflowContext workflowContext = new WorkflowContext();
+ BusinessResourceWorkflowForm processForm = new BusinessResourceWorkflowForm();
+ workflowContext.setProcessForm(processForm);
+ Assert.assertFalse(createHiveTableEventSelector.accept(workflowContext));
+ processForm.setBusinessInfo(new BusinessInfo());
+ Assert.assertFalse(createHiveTableEventSelector.accept(workflowContext));
+ processForm.getBusinessInfo().setInlongGroupId("test");
+ Assert.assertTrue(createHiveTableEventSelector.accept(workflowContext));
+ }
+
+}
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/TaskEventListenerFactoryTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/TaskEventListenerFactoryTest.java
new file mode 100644
index 0000000..81a08da
--- /dev/null
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/TaskEventListenerFactoryTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.workflow;
+
+import java.util.List;
+import org.apache.inlong.manager.common.enums.BizConstant;
+import org.apache.inlong.manager.common.event.task.QueueOperateListener;
+import org.apache.inlong.manager.common.model.WorkflowContext;
+import org.apache.inlong.manager.common.pojo.business.BusinessInfo;
+import org.apache.inlong.manager.service.BaseTest;
+import org.apache.inlong.manager.service.thirdpart.mq.CreatePulsarGroupTaskListener;
+import org.apache.inlong.manager.service.thirdpart.mq.CreatePulsarResourceTaskListener;
+import org.apache.inlong.manager.service.thirdpart.mq.CreateTubeGroupTaskListener;
+import org.apache.inlong.manager.service.thirdpart.mq.CreateTubeTopicTaskListener;
+import org.apache.inlong.manager.service.workflow.business.BusinessResourceWorkflowForm;
+import org.junit.Assert;
+import org.junit.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+
+public class TaskEventListenerFactoryTest extends BaseTest {
+
+ @Autowired
+ TaskEventListenerFactory taskEventListenerFactory;
+
+ @Test
+ public void testGetQueueOperateListener() {
+ WorkflowContext context = new WorkflowContext();
+ BusinessResourceWorkflowForm processForm = new BusinessResourceWorkflowForm();
+ BusinessInfo businessInfo = new BusinessInfo();
+ //check pulsar listener
+ businessInfo.setMiddlewareType(BizConstant.MIDDLEWARE_PULSAR);
+ processForm.setBusinessInfo(businessInfo);
+ context.setProcessForm(processForm);
+ List<QueueOperateListener> queueOperateListeners = taskEventListenerFactory.getQueueOperateListener(context);
+ Assert.assertTrue(queueOperateListeners.size() == 2);
+ Assert.assertTrue(queueOperateListeners.get(0) instanceof CreatePulsarResourceTaskListener);
+ Assert.assertTrue(queueOperateListeners.get(1) instanceof CreatePulsarGroupTaskListener);
+ //check tube listener
+ businessInfo.setMiddlewareType(BizConstant.MIDDLEWARE_TUBE);
+ queueOperateListeners = taskEventListenerFactory.getQueueOperateListener(context);
+ Assert.assertTrue(queueOperateListeners.size() == 2);
+ Assert.assertTrue(queueOperateListeners.get(0) instanceof CreateTubeTopicTaskListener);
+ Assert.assertTrue(queueOperateListeners.get(1) instanceof CreateTubeGroupTaskListener);
+ }
+
+}
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/business/CreateBusinessWorkflowDefinitionTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/business/CreateBusinessWorkflowDefinitionTest.java
index ea3de03..754da1d 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/business/CreateBusinessWorkflowDefinitionTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/business/CreateBusinessWorkflowDefinitionTest.java
@@ -17,9 +17,9 @@
package org.apache.inlong.manager.service.workflow.business;
-import org.apache.inlong.manager.service.core.BaseTest;
-import org.junit.Assert;
import org.apache.inlong.manager.common.model.definition.Process;
+import org.apache.inlong.manager.service.BaseTest;
+import org.junit.Assert;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
diff --git a/inlong-manager/manager-service/src/test/resources/application-test.properties b/inlong-manager/manager-service/src/test/resources/application-test.properties
index 5e56f81..7603e24 100644
--- a/inlong-manager/manager-service/src/test/resources/application-test.properties
+++ b/inlong-manager/manager-service/src/test/resources/application-test.properties
@@ -19,10 +19,12 @@
# Log level
logging.level.root=INFO
logging.level.org.apache.inlong.manager=debug
-spring.datasource.druid.url=jdbc:mysql://127.0.0.1:3306/apache_inlong_manager?useSSL=false&allowPublicKeyRetrieval=true&characterEncoding=UTF-8&nullCatalogMeansCurrent=true&serverTimezone=GMT%2b8
+spring.datasource.druid.url=jdbc:h2:mem:test;MODE=MYSQL;DB_CLOSE_DELAY=-1;IGNORECASE=TRUE;
spring.datasource.druid.username=root
-spring.datasource.druid.password=inlong
-spring.datasource.druid.driver-class-name=com.mysql.cj.jdbc.Driver
+spring.datasource.druid.password=""
+spring.datasource.druid.driver-class-name=org.h2.Driver
+spring.datasource.schema=classpath:sql/apache_inlong_manager.sql
+spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
spring.datasource.druid.validationQuery=SELECT 'x'
# Initialization size, minimum, maximum
spring.datasource.druid.initialSize=20
diff --git a/inlong-manager/manager-service/src/test/resources/plugins/inlong-manager-plugin.jar b/inlong-manager/manager-service/src/test/resources/plugins/inlong-manager-plugin.jar
new file mode 100644
index 0000000..2e52e83
Binary files /dev/null and b/inlong-manager/manager-service/src/test/resources/plugins/inlong-manager-plugin.jar differ
diff --git a/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/web/controller/openapi/SortControllerTest.java b/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/web/controller/openapi/SortControllerTest.java
index 2e4e1b3..dddd183 100644
--- a/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/web/controller/openapi/SortControllerTest.java
+++ b/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/web/controller/openapi/SortControllerTest.java
@@ -17,7 +17,11 @@
package org.apache.inlong.manager.web.controller.openapi;
-import org.apache.inlong.manager.dao.entity.SortClusterConfgiEntity;
+import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
+import static org.springframework.test.web.servlet.result.MockMvcResultHandlers.print;
+import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
+
+import org.apache.inlong.manager.dao.entity.SortClusterConfigEntity;
import org.apache.inlong.manager.dao.entity.SortTaskIdParamEntity;
import org.apache.inlong.manager.dao.mapper.SortClusterConfgiEntityMapper;
import org.apache.inlong.manager.dao.mapper.SortTaskIdParamEntityMapper;
@@ -33,10 +37,6 @@ import org.springframework.test.web.servlet.setup.MockMvcBuilders;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.context.WebApplicationContext;
-import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
-import static org.springframework.test.web.servlet.result.MockMvcResultHandlers.print;
-import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
-
@RunWith(SpringRunner.class)
@SpringBootTest
public class SortControllerTest {
@@ -107,8 +107,8 @@ public class SortControllerTest {
.build();
}
- private SortClusterConfgiEntity prepareClusterConfigEntity(String taskName, String sinkType) {
- return SortClusterConfgiEntity.builder()
+ private SortClusterConfigEntity prepareClusterConfigEntity(String taskName, String sinkType) {
+ return SortClusterConfigEntity.builder()
.clusterName("testCluster")
.taskName(taskName)
.sinkType(sinkType)
diff --git a/inlong-manager/pom.xml b/inlong-manager/pom.xml
index 955f80a..58945cc 100644
--- a/inlong-manager/pom.xml
+++ b/inlong-manager/pom.xml
@@ -264,7 +264,11 @@
<artifactId>jackson-datatype-jsr310</artifactId>
<version>${jackson.jsr310.version}</version>
</dependency>
-
+ <dependency>
+ <groupId>com.fasterxml.jackson.dataformat</groupId>
+ <artifactId>jackson-dataformat-yaml</artifactId>
+ <version>${jackson.version}</version>
+ </dependency>
<dependency>
<groupId>io.swagger</groupId>
<artifactId>swagger-annotations</artifactId>