You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by ch...@apache.org on 2021/07/20 12:32:58 UTC
[incubator-eventmesh] branch develop updated: [ISSUE #367] Refactor
connector (#433)
This is an automated email from the ASF dual-hosted git repository.
chenguangsheng pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
The following commit(s) were added to refs/heads/develop by this push:
new 7850620 [ISSUE #367] Refactor connector (#433)
7850620 is described below
commit 7850620da2909b4b9b47916aebc355bcc3549502
Author: Wenjun Ruan <86...@qq.com>
AuthorDate: Tue Jul 20 20:32:49 2021 +0800
[ISSUE #367] Refactor connector (#433)
* [ISSUE #367] Refactor connector
1.support load plugin from eventMeshPluginDir
2.remove connector plugin from runtime
* Add docs of plugin
* Remove connector-api in connector-plugin
---
build.gradle | 48 +-------
.../eventmesh-runtime-quickstart.zh-CN.md | 14 ++-
.../instructions/eventmesh-runtime-quickstart.md | 16 ++-
docs/images/project-structure.png | Bin 54777 -> 141844 bytes
.../build.gradle | 37 +++---
.../eventmesh-connector-api}/build.gradle | 2 +-
.../eventmesh-connector-api}/gradle.properties | 0
.../org/apache/eventmesh/api/AbstractContext.java | 0
.../org/apache/eventmesh/api/EventMeshAction.java | 0
.../api/EventMeshAsyncConsumeContext.java | 0
.../java/org/apache/eventmesh/api/RRCallback.java | 0
.../eventmesh/api/consumer/MeshMQPushConsumer.java | 0
.../api/factory/ConnectorPluginFactory.java | 19 ++-
.../eventmesh/api/producer/MeshMQProducer.java | 0
.../eventmesh-connector-rocketmq}/build.gradle | 4 +-
.../gradle.properties | 0
.../rocketmq/MessagingAccessPointImpl.java | 0
.../ConsumeMessageConcurrentlyService.java | 5 +-
.../connector/rocketmq/common/Constants.java | 0
.../rocketmq/common/EventMeshConstants.java | 0
.../connector/rocketmq/config/ClientConfig.java | 0
.../rocketmq/config/ClientConfiguration.java | 0
.../rocketmq/config/ConfigurationWrapper.java | 2 +-
.../rocketmq/consumer/PushConsumerImpl.java | 0
.../rocketmq/consumer/RocketMQConsumerImpl.java | 2 +-
.../connector/rocketmq/domain/ConsumeRequest.java | 0
.../connector/rocketmq/domain/NonStandardKeys.java | 0
.../rocketmq/domain/RocketMQConstants.java | 0
.../patch/EventMeshConsumeConcurrentlyContext.java | 0
.../patch/EventMeshConsumeConcurrentlyStatus.java | 0
.../EventMeshMessageListenerConcurrently.java | 0
.../rocketmq/producer/AbstractOMSProducer.java | 0
.../connector/rocketmq/producer/ProducerImpl.java | 0
.../rocketmq/producer/RocketMQProducerImpl.java | 0
.../connector/rocketmq/promise/DefaultPromise.java | 0
.../connector/rocketmq/promise/FutureState.java | 0
.../connector/rocketmq/utils/BeanUtils.java | 0
.../connector/rocketmq/utils/OMSUtil.java | 0
...pache.eventmesh.api.consumer.MeshMQPushConsumer | 0
...rg.apache.eventmesh.api.producer.MeshMQProducer | 0
.../rocketmq/consumer/PushConsumerImplTest.java | 1 -
.../apache/rocketmq/producer/ProducerImplTest.java | 0
.../rocketmq/promise/DefaultPromiseTest.java | 0
.../org/apache/rocketmq/utils/BeanUtilsTest.java | 0
...rg.apache.io.openmessaging.MessagingAccessPoint | 0
.../org.apache.io.openmessaging.producer.Producer | 0
.../gradle.properties | 0
...pache.eventmesh.api.consumer.MeshMQPushConsumer | 16 ---
...rg.apache.eventmesh.api.producer.MeshMQProducer | 16 ---
eventmesh-runtime/build.gradle | 4 +-
.../runtime/core/plugin/MQConsumerWrapper.java | 17 +--
.../runtime/core/plugin/MQProducerWrapper.java | 18 +--
.../protocol/http/consumer/HandleMsgContext.java | 1 -
.../tcp/client/group/ClientGroupWrapper.java | 1 -
.../client/session/push/DownStreamMsgContext.java | 1 -
.../eventmesh/spi/EventMeshExtensionFactory.java | 83 ++++++++++++-
.../eventmesh/spi/EventMeshExtensionLoader.java | 124 --------------------
.../eventmesh/spi/loader/ExtensionClassLoader.java | 22 +++-
.../spi/loader/JarExtensionClassLoader.java | 130 +++++++++++++++++++++
.../spi/loader/MetaInfExtensionClassLoader.java | 92 +++++++++++++++
eventmesh-starter/build.gradle | 4 +-
eventmesh-test/build.gradle | 8 +-
install.sh | 4 +-
settings.gradle | 4 +-
style/checkStyle-suppressions.xml | 25 ++++
style/checkStyle.xml | 11 ++
66 files changed, 441 insertions(+), 290 deletions(-)
diff --git a/build.gradle b/build.gradle
index 1ff8edf..1658b84 100644
--- a/build.gradle
+++ b/build.gradle
@@ -188,45 +188,6 @@ subprojects {
}
}
-// checkstyle {
-// toolVersion = "8.32"
-// ignoreFailures = true
-// sourceSets = [sourceSets.main]
-// configFile = '../style/codeStyle.xml' as File
-// showViolations true
-// }
-//
-// tasks.withType(Checkstyle) {
-// reports {
-// xml.enabled false
-// html.enabled true
-// }
-// }
-//
-// sourceSets {
-// main {
-// java {
-// srcDir 'src/main/java'
-// }
-//
-// resources {
-// srcDir 'src/main/resources'
-// }
-//
-// }
-//
-// test {
-// java {
-// srcDir 'src/test/java'
-// }
-//
-// resources {
-// srcDir 'src/test/resources'
-// }
-//
-// }
-// }
-
spotbugs {
//toolVersion = '4.2.3'
ignoreFailures = true
@@ -271,14 +232,6 @@ subprojects {
}
}
-// tasks.withType(Pmd) {
-// reports {
-// xml.enabled = false
-// html.enabled = true
-// }
-// }
-
-
pmd {
consoleOutput = true
toolVersion = "6.23.0"
@@ -314,6 +267,7 @@ subprojects {
from project.jar.getArchivePath()
exclude 'eventmesh-common*.jar'
exclude 'eventmesh-connector-api*.jar'
+ exclude 'eventmesh-connector-plugin*.jar'
exclude 'eventmesh-starter*.jar'
exclude 'eventmesh-test*.jar'
exclude 'eventmesh-sdk*.jar'
diff --git a/docs/cn/instructions/eventmesh-runtime-quickstart.zh-CN.md b/docs/cn/instructions/eventmesh-runtime-quickstart.zh-CN.md
index 6ae41fa..1bc1aa8 100644
--- a/docs/cn/instructions/eventmesh-runtime-quickstart.zh-CN.md
+++ b/docs/cn/instructions/eventmesh-runtime-quickstart.zh-CN.md
@@ -20,7 +20,7 @@ Gradle至少为7.0, 推荐 7.0.*
```$ xslt
unzip EventMesh-master.zip
cd / *您的部署路径* /EventMesh-master
-gradle clean dist tar -x test
+gradle clean dist copyConnectorPlugin tar -x test
```
您将在目录/ *您的部署路径* /EventMesh-master/eventmesh-runtime/dist中获得**eventmesh-runtime_1.0.0.tar.gz**
@@ -57,14 +57,20 @@ sh start.sh
![project-structure](../../images/project-structure.png)
- eventmesh-common : eventmesh公共类与方法模块
-- eventmesh-connector-api : eventmesh插件接口定义模块
-- eventmesh-connector-rocketmq : eventmesh rocketmq插件模块
+- eventmesh-connector-api : eventmesh connector插件接口定义模块
+- eventmesh-connector-plugin : eventmesh connector插件模块
- eventmesh-runtime : eventmesh运行时模块
- eventmesh-sdk-java : eventmesh java客户端sdk
- eventmesh-starter : eventmesh本地启动运行项目入口
- eventmesh-spi : eventmesh SPI加载模块
-> 注:插件模块遵循eventmesh定义的spi机制,需要在对应模块中的/main/resources/META-INF/eventmesh 下配置相关接口与实现类的映射文件
+> 注:插件模块遵循eventmesh定义的SPI规范, 自定义的SPI接口需要使用注解@EventMeshSPI标识.
+> 插件实例需要在对应模块中的/main/resources/META-INF/eventmesh 下配置相关接口与实现类的映射文件,文件名为SPI接口全类名.
+> 文件内容为插件实例名到插件实例的映射, 具体可以参考eventmesh-connector-rocketmq插件模块
+
+插件可以从classpath和插件目录下面加载. 在本地开发阶段可以将使用的插件在eventmesh-starter模块build.gradle中进行声明,或者执行gradle的copyConnectorPlugin任务
+将插件拷贝至dist/plugin目录下, eventmesh默认会加载项目下dist/plugin目录下的插件, 加载目录可以通过-DeventMeshPluginDir=your_plugin_directory来改变插件目录.
+运行时需要使用的插件实例可以在eventmesh.properties中进行配置
**2.3.2 配置插件**
diff --git a/docs/en/instructions/eventmesh-runtime-quickstart.md b/docs/en/instructions/eventmesh-runtime-quickstart.md
index 62b315c..d47d5d3 100644
--- a/docs/en/instructions/eventmesh-runtime-quickstart.md
+++ b/docs/en/instructions/eventmesh-runtime-quickstart.md
@@ -20,7 +20,7 @@ You will get **EventMesh-master.zip**
```$xslt
unzip EventMesh-master.zip
cd /*YOUR DEPLOY PATH*/EventMesh-master
-gradle clean dist tar -x test
+gradle clean dist copyConnectorPlugin tar -x test
```
You will get **EventMesh_1.2.0.tar.gz** in directory /* YOUR DEPLOY PATH */EventMesh-master/build
@@ -58,14 +58,22 @@ Same with 1.2
- eventmesh-common : eventmesh common classes and method module
- eventmesh-connector-api : eventmesh connector api definition module
-- eventmesh-connector-rocketmq : eventmesh rocketmq connector module
+- eventmesh-connector-plugin : eventmesh connector plugin instance module
- eventmesh-runtime : eventmesh runtime module
- eventmesh-sdk-java : eventmesh java client sdk
- eventmesh-starter : eventmesh project local start entry
- eventmesh-spi : eventmesh SPI load module
-> ps: The loading of connector plugin follows the eventmesh SPI mechanism, it's necessary to configure the mapping file of
-related interface and implementation class under /main/resources/meta-inf/eventmesh in the corresponding module
+> ps: The plugin module follows the eventmesh SPI specification, custom SPI interface need to be identified with the @EventMeshSPI annotation.
+> The plugin instance needs to be configured in corresponding module under /main/resources/meta-inf/eventmesh with the mapping file of
+> related interface and implementation class. The content of the file is a mapping of plugin instance name to plugin instance, you can find more
+> detail in eventmesh-connector-rocketmq module
+
+The plugin can be loaded from classpath and plugin directory. In local develop, you can declare the used plugins in build.gradle of eventmesh-starter module,
+or execute copyConnectorPlugin task of gradle to copy the plugin instance jar to dist/plugin directory. By default, eventmesh will load the plugins in project's
+dist/plugin, this can be changed by add -DeventMeshPluginDir=your_plugin_directory.
+The plugin instance need to be used at runtime can be configured in eventmesh.properties.
+
**2.3.2 Configure plugin**
diff --git a/docs/images/project-structure.png b/docs/images/project-structure.png
index efc5249..2305a4e 100644
Binary files a/docs/images/project-structure.png and b/docs/images/project-structure.png differ
diff --git a/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/EventMeshAsyncConsumeContext.java b/eventmesh-connector-plugin/build.gradle
similarity index 54%
copy from eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/EventMeshAsyncConsumeContext.java
copy to eventmesh-connector-plugin/build.gradle
index c7e4e7f..be1a227 100644
--- a/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/EventMeshAsyncConsumeContext.java
+++ b/eventmesh-connector-plugin/build.gradle
@@ -14,27 +14,24 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.eventmesh.api;
-import io.openmessaging.api.Action;
-import io.openmessaging.api.AsyncConsumeContext;
-
-public abstract class EventMeshAsyncConsumeContext extends AsyncConsumeContext {
-
- private AbstractContext abstractContext;
-
- public AbstractContext getAbstractContext() {
- return abstractContext;
+task copyConnectorPlugin(dependsOn: ['jar']) {
+ doFirst {
+ new File(projectDir, '../eventmesh-connector-plugin/dist/apps').mkdir()
+ new File(projectDir, '../dist/plugin/connector').mkdirs()
}
-
- public void setAbstractContext(AbstractContext abstractContext) {
- this.abstractContext = abstractContext;
- }
-
- public abstract void commit(EventMeshAction action);
-
- @Override
- public void commit(Action action) {
- throw new UnsupportedOperationException("not support yet");
+ doLast {
+ copy {
+ into('../eventmesh-connector-plugin/dist/apps/')
+ from project.jar.getArchivePath()
+ exclude {
+ "eventmesh-connector-plugin-${version}.jar"
+ "eventmesh-connector-api-${version}.jar"
+ }
+ }
+ copy {
+ into '../dist/plugin/connector'
+ from "../eventmesh-connector-plugin/dist/apps/eventmesh-connector-rocketmq-${version}.jar"
+ }
}
}
\ No newline at end of file
diff --git a/eventmesh-connector-api/build.gradle b/eventmesh-connector-plugin/eventmesh-connector-api/build.gradle
similarity index 91%
rename from eventmesh-connector-api/build.gradle
rename to eventmesh-connector-plugin/eventmesh-connector-api/build.gradle
index 157048e..5389a80 100644
--- a/eventmesh-connector-api/build.gradle
+++ b/eventmesh-connector-plugin/eventmesh-connector-api/build.gradle
@@ -20,6 +20,6 @@ List open_message = [
]
dependencies {
- implementation open_message,project(":eventmesh-common"), project(":eventmesh-spi")
+ api open_message, project(":eventmesh-common"), project(":eventmesh-spi")
testImplementation open_message,project(":eventmesh-common"), project(":eventmesh-spi")
}
diff --git a/eventmesh-connector-api/gradle.properties b/eventmesh-connector-plugin/eventmesh-connector-api/gradle.properties
similarity index 100%
copy from eventmesh-connector-api/gradle.properties
copy to eventmesh-connector-plugin/eventmesh-connector-api/gradle.properties
diff --git a/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/AbstractContext.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/AbstractContext.java
similarity index 100%
rename from eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/AbstractContext.java
rename to eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/AbstractContext.java
diff --git a/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/EventMeshAction.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/EventMeshAction.java
similarity index 100%
rename from eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/EventMeshAction.java
rename to eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/EventMeshAction.java
diff --git a/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/EventMeshAsyncConsumeContext.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/EventMeshAsyncConsumeContext.java
similarity index 100%
rename from eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/EventMeshAsyncConsumeContext.java
rename to eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/EventMeshAsyncConsumeContext.java
diff --git a/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/RRCallback.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/RRCallback.java
similarity index 100%
rename from eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/RRCallback.java
rename to eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/RRCallback.java
diff --git a/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/MeshMQPushConsumer.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/MeshMQPushConsumer.java
similarity index 100%
rename from eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/MeshMQPushConsumer.java
rename to eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/MeshMQPushConsumer.java
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/PluginFactory.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/factory/ConnectorPluginFactory.java
similarity index 75%
rename from eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/PluginFactory.java
rename to eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/factory/ConnectorPluginFactory.java
index b114953..cff0c8e 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/PluginFactory.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/factory/ConnectorPluginFactory.java
@@ -17,18 +17,33 @@
* under the License.
*/
-package org.apache.eventmesh.runtime.core.plugin;
+package org.apache.eventmesh.api.factory;
import org.apache.eventmesh.api.consumer.MeshMQPushConsumer;
import org.apache.eventmesh.api.producer.MeshMQProducer;
import org.apache.eventmesh.spi.EventMeshExtensionFactory;
-public class PluginFactory {
+/**
+ * The factory to get connector {@link MeshMQProducer} and {@link MeshMQPushConsumer}
+ */
+public class ConnectorPluginFactory {
+ /**
+ * Get MeshMQProducer instance by plugin name
+ *
+ * @param connectorPluginName plugin name
+ * @return MeshMQProducer instance
+ */
public static MeshMQProducer getMeshMQProducer(String connectorPluginName) {
return EventMeshExtensionFactory.getExtension(MeshMQProducer.class, connectorPluginName);
}
+ /**
+ * Get MeshMQPushConsumer instance by plugin name
+ *
+ * @param connectorPluginName plugin name
+ * @return MeshMQPushConsumer instance
+ */
public static MeshMQPushConsumer getMeshMQPushConsumer(String connectorPluginName) {
return EventMeshExtensionFactory.getExtension(MeshMQPushConsumer.class, connectorPluginName);
}
diff --git a/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/MeshMQProducer.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/MeshMQProducer.java
similarity index 100%
rename from eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/MeshMQProducer.java
rename to eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/MeshMQProducer.java
diff --git a/eventmesh-connector-rocketmq/build.gradle b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/build.gradle
similarity index 90%
rename from eventmesh-connector-rocketmq/build.gradle
rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/build.gradle
index 6126b50..03830a2 100644
--- a/eventmesh-connector-rocketmq/build.gradle
+++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/build.gradle
@@ -55,6 +55,6 @@ List open_message = [
]
dependencies {
- implementation rocketmq, metrics,open_message,project(":eventmesh-connector-api"),project(":eventmesh-common")
- testImplementation rocketmq, metrics,open_message,project(":eventmesh-connector-api"),project(":eventmesh-common")
+ api rocketmq, metrics, open_message, project(":eventmesh-connector-plugin:eventmesh-connector-api")
+ testImplementation rocketmq, metrics, open_message, project(":eventmesh-connector-plugin:eventmesh-connector-api")
}
diff --git a/eventmesh-connector-rocketmq/gradle.properties b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/gradle.properties
similarity index 100%
rename from eventmesh-connector-rocketmq/gradle.properties
rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/gradle.properties
diff --git a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/MessagingAccessPointImpl.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/MessagingAccessPointImpl.java
similarity index 100%
rename from eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/MessagingAccessPointImpl.java
rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/MessagingAccessPointImpl.java
diff --git a/eventmesh-connector-rocketmq/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
similarity index 98%
rename from eventmesh-connector-rocketmq/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
index 95c2fc9..7111ca9 100644
--- a/eventmesh-connector-rocketmq/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.rocketmq.client.impl.consumer;
+package org.apache.eventmesh.connector.rocketmq.client.impl.consumer;
import java.util.ArrayList;
import java.util.Collections;
@@ -38,6 +38,9 @@ import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.ConsumeReturnType;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.hook.ConsumeMessageContext;
+import org.apache.rocketmq.client.impl.consumer.ConsumeMessageService;
+import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
+import org.apache.rocketmq.client.impl.consumer.ProcessQueue;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.stat.ConsumerStatsManager;
import org.apache.rocketmq.common.MixAll;
diff --git a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/common/Constants.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/common/Constants.java
similarity index 100%
rename from eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/common/Constants.java
rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/common/Constants.java
diff --git a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/common/EventMeshConstants.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/common/EventMeshConstants.java
similarity index 100%
rename from eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/common/EventMeshConstants.java
rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/common/EventMeshConstants.java
diff --git a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/config/ClientConfig.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/config/ClientConfig.java
similarity index 100%
rename from eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/config/ClientConfig.java
rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/config/ClientConfig.java
diff --git a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/config/ClientConfiguration.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/config/ClientConfiguration.java
similarity index 100%
rename from eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/config/ClientConfiguration.java
rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/config/ClientConfiguration.java
diff --git a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/config/ConfigurationWrapper.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/config/ConfigurationWrapper.java
similarity index 99%
rename from eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/config/ConfigurationWrapper.java
rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/config/ConfigurationWrapper.java
index b1c2732..9334b5f 100644
--- a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/config/ConfigurationWrapper.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/config/ConfigurationWrapper.java
@@ -73,4 +73,4 @@ public class ConfigurationWrapper {
public String getProp(String key) {
return StringUtils.isEmpty(key) ? null : properties.getProperty(key, null);
}
-}
\ No newline at end of file
+}
diff --git a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/PushConsumerImpl.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/PushConsumerImpl.java
similarity index 100%
rename from eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/PushConsumerImpl.java
rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/PushConsumerImpl.java
diff --git a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/RocketMQConsumerImpl.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/RocketMQConsumerImpl.java
similarity index 98%
rename from eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/RocketMQConsumerImpl.java
rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/RocketMQConsumerImpl.java
index 7087739..9d0d347 100644
--- a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/RocketMQConsumerImpl.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/RocketMQConsumerImpl.java
@@ -40,7 +40,7 @@ import org.apache.eventmesh.connector.rocketmq.config.ClientConfiguration;
import org.apache.eventmesh.connector.rocketmq.config.ConfigurationWrapper;
import org.apache.eventmesh.connector.rocketmq.patch.EventMeshConsumeConcurrentlyContext;
import org.apache.eventmesh.connector.rocketmq.utils.OMSUtil;
-import org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService;
+import org.apache.eventmesh.connector.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService;
import org.apache.rocketmq.client.impl.consumer.ConsumeMessageService;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
diff --git a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/domain/ConsumeRequest.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/domain/ConsumeRequest.java
similarity index 100%
rename from eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/domain/ConsumeRequest.java
rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/domain/ConsumeRequest.java
diff --git a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/domain/NonStandardKeys.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/domain/NonStandardKeys.java
similarity index 100%
rename from eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/domain/NonStandardKeys.java
rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/domain/NonStandardKeys.java
diff --git a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/domain/RocketMQConstants.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/domain/RocketMQConstants.java
similarity index 100%
copy from eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/domain/RocketMQConstants.java
copy to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/domain/RocketMQConstants.java
diff --git a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/patch/EventMeshConsumeConcurrentlyContext.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/patch/EventMeshConsumeConcurrentlyContext.java
similarity index 100%
rename from eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/patch/EventMeshConsumeConcurrentlyContext.java
rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/patch/EventMeshConsumeConcurrentlyContext.java
diff --git a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/patch/EventMeshConsumeConcurrentlyStatus.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/patch/EventMeshConsumeConcurrentlyStatus.java
similarity index 100%
rename from eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/patch/EventMeshConsumeConcurrentlyStatus.java
rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/patch/EventMeshConsumeConcurrentlyStatus.java
diff --git a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/patch/EventMeshMessageListenerConcurrently.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/patch/EventMeshMessageListenerConcurrently.java
similarity index 100%
rename from eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/patch/EventMeshMessageListenerConcurrently.java
rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/patch/EventMeshMessageListenerConcurrently.java
diff --git a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/AbstractOMSProducer.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/AbstractOMSProducer.java
similarity index 100%
rename from eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/AbstractOMSProducer.java
rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/AbstractOMSProducer.java
diff --git a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/ProducerImpl.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/ProducerImpl.java
similarity index 100%
rename from eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/ProducerImpl.java
rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/ProducerImpl.java
diff --git a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/RocketMQProducerImpl.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/RocketMQProducerImpl.java
similarity index 100%
rename from eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/RocketMQProducerImpl.java
rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/RocketMQProducerImpl.java
diff --git a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/promise/DefaultPromise.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/promise/DefaultPromise.java
similarity index 100%
rename from eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/promise/DefaultPromise.java
rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/promise/DefaultPromise.java
diff --git a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/promise/FutureState.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/promise/FutureState.java
similarity index 100%
rename from eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/promise/FutureState.java
rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/promise/FutureState.java
diff --git a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/utils/BeanUtils.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/utils/BeanUtils.java
similarity index 100%
rename from eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/utils/BeanUtils.java
rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/utils/BeanUtils.java
diff --git a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/utils/OMSUtil.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/utils/OMSUtil.java
similarity index 100%
rename from eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/utils/OMSUtil.java
rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/utils/OMSUtil.java
diff --git a/eventmesh-connector-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.consumer.MeshMQPushConsumer b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.consumer.MeshMQPushConsumer
similarity index 100%
rename from eventmesh-connector-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.consumer.MeshMQPushConsumer
rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.consumer.MeshMQPushConsumer
diff --git a/eventmesh-connector-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.producer.MeshMQProducer b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.producer.MeshMQProducer
similarity index 100%
rename from eventmesh-connector-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.producer.MeshMQProducer
rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.producer.MeshMQProducer
diff --git a/eventmesh-connector-rocketmq/src/test/java/org/apache/rocketmq/consumer/PushConsumerImplTest.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/test/java/org/apache/rocketmq/consumer/PushConsumerImplTest.java
similarity index 99%
rename from eventmesh-connector-rocketmq/src/test/java/org/apache/rocketmq/consumer/PushConsumerImplTest.java
rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/test/java/org/apache/rocketmq/consumer/PushConsumerImplTest.java
index 1140535..86a5ffd 100644
--- a/eventmesh-connector-rocketmq/src/test/java/org/apache/rocketmq/consumer/PushConsumerImplTest.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/test/java/org/apache/rocketmq/consumer/PushConsumerImplTest.java
@@ -23,7 +23,6 @@ import java.lang.reflect.Field;
import java.util.Collections;
import java.util.Properties;
-import io.openmessaging.api.Action;
import io.openmessaging.api.AsyncConsumeContext;
import io.openmessaging.api.AsyncMessageListener;
import io.openmessaging.api.Consumer;
diff --git a/eventmesh-connector-rocketmq/src/test/java/org/apache/rocketmq/producer/ProducerImplTest.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/test/java/org/apache/rocketmq/producer/ProducerImplTest.java
similarity index 100%
rename from eventmesh-connector-rocketmq/src/test/java/org/apache/rocketmq/producer/ProducerImplTest.java
rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/test/java/org/apache/rocketmq/producer/ProducerImplTest.java
diff --git a/eventmesh-connector-rocketmq/src/test/java/org/apache/rocketmq/promise/DefaultPromiseTest.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/test/java/org/apache/rocketmq/promise/DefaultPromiseTest.java
similarity index 100%
rename from eventmesh-connector-rocketmq/src/test/java/org/apache/rocketmq/promise/DefaultPromiseTest.java
rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/test/java/org/apache/rocketmq/promise/DefaultPromiseTest.java
diff --git a/eventmesh-connector-rocketmq/src/test/java/org/apache/rocketmq/utils/BeanUtilsTest.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/test/java/org/apache/rocketmq/utils/BeanUtilsTest.java
similarity index 100%
rename from eventmesh-connector-rocketmq/src/test/java/org/apache/rocketmq/utils/BeanUtilsTest.java
rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/test/java/org/apache/rocketmq/utils/BeanUtilsTest.java
diff --git a/eventmesh-connector-rocketmq/src/test/resources/META-INF/services/org.apache.io.openmessaging.MessagingAccessPoint b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/test/resources/META-INF/services/org.apache.io.openmessaging.MessagingAccessPoint
similarity index 100%
rename from eventmesh-connector-rocketmq/src/test/resources/META-INF/services/org.apache.io.openmessaging.MessagingAccessPoint
rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/test/resources/META-INF/services/org.apache.io.openmessaging.MessagingAccessPoint
diff --git a/eventmesh-connector-rocketmq/src/test/resources/META-INF/services/org.apache.io.openmessaging.producer.Producer b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/test/resources/META-INF/services/org.apache.io.openmessaging.producer.Producer
similarity index 100%
rename from eventmesh-connector-rocketmq/src/test/resources/META-INF/services/org.apache.io.openmessaging.producer.Producer
rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/test/resources/META-INF/services/org.apache.io.openmessaging.producer.Producer
diff --git a/eventmesh-connector-api/gradle.properties b/eventmesh-connector-plugin/gradle.properties
similarity index 100%
rename from eventmesh-connector-api/gradle.properties
rename to eventmesh-connector-plugin/gradle.properties
diff --git a/eventmesh-connector-rocketmq/src/main/resources/META-INF/services/org.apache.eventmesh.api.consumer.MeshMQPushConsumer b/eventmesh-connector-rocketmq/src/main/resources/META-INF/services/org.apache.eventmesh.api.consumer.MeshMQPushConsumer
deleted file mode 100644
index c98880a..0000000
--- a/eventmesh-connector-rocketmq/src/main/resources/META-INF/services/org.apache.eventmesh.api.consumer.MeshMQPushConsumer
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-org.apache.eventmesh.connector.rocketmq.consumer.RocketMQConsumerImpl
\ No newline at end of file
diff --git a/eventmesh-connector-rocketmq/src/main/resources/META-INF/services/org.apache.eventmesh.api.producer.MeshMQProducer b/eventmesh-connector-rocketmq/src/main/resources/META-INF/services/org.apache.eventmesh.api.producer.MeshMQProducer
deleted file mode 100644
index 28907ca..0000000
--- a/eventmesh-connector-rocketmq/src/main/resources/META-INF/services/org.apache.eventmesh.api.producer.MeshMQProducer
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-org.apache.eventmesh.connector.rocketmq.producer.RocketMQProducerImpl
\ No newline at end of file
diff --git a/eventmesh-runtime/build.gradle b/eventmesh-runtime/build.gradle
index e5bb065..00779a7 100644
--- a/eventmesh-runtime/build.gradle
+++ b/eventmesh-runtime/build.gradle
@@ -31,6 +31,6 @@ List open_message = [
dependencies {
- implementation metrics, open_message,project(":eventmesh-connector-api"),project(":eventmesh-common"),project(":eventmesh-spi")
- testImplementation metrics,open_message,project(":eventmesh-common"),project(":eventmesh-connector-api"),project(":eventmesh-spi")
+ implementation metrics, open_message, project(":eventmesh-connector-plugin:eventmesh-connector-api")
+ testImplementation metrics, open_message, project(":eventmesh-connector-plugin:eventmesh-connector-api")
}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQConsumerWrapper.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQConsumerWrapper.java
index b09f9ed..68ea786 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQConsumerWrapper.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQConsumerWrapper.java
@@ -19,13 +19,13 @@ package org.apache.eventmesh.runtime.core.plugin;
import java.util.List;
import java.util.Properties;
-import java.util.ServiceLoader;
import io.openmessaging.api.AsyncMessageListener;
import io.openmessaging.api.Message;
import org.apache.eventmesh.api.AbstractContext;
import org.apache.eventmesh.api.consumer.MeshMQPushConsumer;
+import org.apache.eventmesh.api.factory.ConnectorPluginFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,7 +36,7 @@ public class MQConsumerWrapper extends MQWrapper {
protected MeshMQPushConsumer meshMQPushConsumer;
public MQConsumerWrapper(String connectorPluginType) {
- this.meshMQPushConsumer = PluginFactory.getMeshMQPushConsumer(connectorPluginType);
+ this.meshMQPushConsumer = ConnectorPluginFactory.getMeshMQPushConsumer(connectorPluginType);
if (meshMQPushConsumer == null) {
logger.error("can't load the meshMQPushConsumer plugin, please check.");
throw new RuntimeException("doesn't load the meshMQPushConsumer plugin, please check.");
@@ -52,24 +52,11 @@ public class MQConsumerWrapper extends MQWrapper {
}
public synchronized void init(Properties keyValue) throws Exception {
- meshMQPushConsumer = getMeshMQPushConsumer();
- if (meshMQPushConsumer == null) {
- logger.error("can't load the meshMQPushConsumer plugin, please check.");
- throw new RuntimeException("doesn't load the meshMQPushConsumer plugin, please check.");
- }
meshMQPushConsumer.init(keyValue);
inited.compareAndSet(false, true);
}
- private MeshMQPushConsumer getMeshMQPushConsumer() {
- ServiceLoader<MeshMQPushConsumer> meshMQPushConsumerServiceLoader = ServiceLoader.load(MeshMQPushConsumer.class);
- if (meshMQPushConsumerServiceLoader.iterator().hasNext()) {
- return meshMQPushConsumerServiceLoader.iterator().next();
- }
- return null;
- }
-
public synchronized void start() throws Exception {
meshMQPushConsumer.start();
started.compareAndSet(false, true);
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQProducerWrapper.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQProducerWrapper.java
index 2e55fbf..f979dc4 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQProducerWrapper.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQProducerWrapper.java
@@ -18,12 +18,12 @@
package org.apache.eventmesh.runtime.core.plugin;
import java.util.Properties;
-import java.util.ServiceLoader;
import io.openmessaging.api.Message;
import io.openmessaging.api.SendCallback;
import org.apache.eventmesh.api.RRCallback;
+import org.apache.eventmesh.api.factory.ConnectorPluginFactory;
import org.apache.eventmesh.api.producer.MeshMQProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,7 +35,7 @@ public class MQProducerWrapper extends MQWrapper {
protected MeshMQProducer meshMQProducer;
public MQProducerWrapper(String connectorPluginType) {
- this.meshMQProducer = PluginFactory.getMeshMQProducer(connectorPluginType);
+ this.meshMQProducer = ConnectorPluginFactory.getMeshMQProducer(connectorPluginType);
if (meshMQProducer == null) {
logger.error("can't load the meshMQProducer plugin, please check.");
throw new RuntimeException("doesn't load the meshMQProducer plugin, please check.");
@@ -47,24 +47,10 @@ public class MQProducerWrapper extends MQWrapper {
return;
}
- meshMQProducer = getSpiMeshMQProducer();
- if (meshMQProducer == null) {
- logger.error("can't load the meshMQProducer plugin, please check.");
- throw new RuntimeException("doesn't load the meshMQProducer plugin, please check.");
- }
-
meshMQProducer.init(keyValue);
inited.compareAndSet(false, true);
}
- private MeshMQProducer getSpiMeshMQProducer() {
- ServiceLoader<MeshMQProducer> meshMQProducerServiceLoader = ServiceLoader.load(MeshMQProducer.class);
- if (meshMQProducerServiceLoader.iterator().hasNext()) {
- return meshMQProducerServiceLoader.iterator().next();
- }
- return null;
- }
-
public synchronized void start() throws Exception {
if (started.get()) {
return;
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/HandleMsgContext.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/HandleMsgContext.java
index 2d19174..3ce124c 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/HandleMsgContext.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/HandleMsgContext.java
@@ -28,7 +28,6 @@ import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.eventmesh.api.AbstractContext;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.protocol.SubscriptionItem;
-import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupConf;
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java
index 310ea6d..946cc8f 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java
@@ -50,7 +50,6 @@ import org.apache.eventmesh.runtime.configuration.EventMeshTCPConfiguration;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.plugin.MQConsumerWrapper;
import org.apache.eventmesh.runtime.core.plugin.MQProducerWrapper;
-import org.apache.eventmesh.runtime.core.plugin.PluginFactory;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.group.dispatch.DownstreamDispatchStrategy;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.push.DownStreamMsgContext;
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/DownStreamMsgContext.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/DownStreamMsgContext.java
index 8e441bd..712e6fa 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/DownStreamMsgContext.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/DownStreamMsgContext.java
@@ -29,7 +29,6 @@ import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.eventmesh.api.AbstractContext;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.protocol.SubscriptionItem;
-import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.plugin.MQConsumerWrapper;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;
diff --git a/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshExtensionFactory.java b/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshExtensionFactory.java
index 6aea9db..6388809 100644
--- a/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshExtensionFactory.java
+++ b/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshExtensionFactory.java
@@ -18,11 +18,42 @@
package org.apache.eventmesh.spi;
import org.apache.commons.lang3.StringUtils;
+import org.apache.eventmesh.spi.loader.ExtensionClassLoader;
+import org.apache.eventmesh.spi.loader.JarExtensionClassLoader;
+import org.apache.eventmesh.spi.loader.MetaInfExtensionClassLoader;
import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * The extension fetching factory, all extension plugins should be fetched by this factory.
+ * And all the extension plugins defined in eventmesh should have {@link EventMeshSPI} annotation.
+ */
public enum EventMeshExtensionFactory {
;
+ private static final Logger logger = LoggerFactory.getLogger(EventMeshExtensionFactory.class);
+
+ private static final List<ExtensionClassLoader> extensionClassLoaders = new ArrayList<>();
+
+ static {
+ extensionClassLoaders.add(new MetaInfExtensionClassLoader());
+ extensionClassLoaders.add(new JarExtensionClassLoader());
+ }
+
+ private static final ConcurrentHashMap<String, Object> EXTENSION_INSTANCE_CACHE =
+ new ConcurrentHashMap<>(16);
+
+ /**
+ * @param extensionType extension plugin class type
+ * @param extensionName extension instance name
+ * @param <T> the type of the plugin
+ * @return plugin instance
+ */
public static <T> T getExtension(Class<T> extensionType, String extensionName) {
if (extensionType == null) {
throw new ExtensionException("extensionType is null");
@@ -33,6 +64,56 @@ public enum EventMeshExtensionFactory {
if (!extensionType.isInterface() || !extensionType.isAnnotationPresent(EventMeshSPI.class)) {
throw new ExtensionException(String.format("extensionType:%s is invalided", extensionType));
}
- return EventMeshExtensionLoader.getExtension(extensionType, extensionName);
+ EventMeshSPI eventMeshSPIAnnotation = extensionType.getAnnotation(EventMeshSPI.class);
+ if (eventMeshSPIAnnotation.isSingleton()) {
+ return getSingletonExtension(extensionType, extensionName);
+ }
+ return getPrototypeExtension(extensionType, extensionName);
}
+
+ @SuppressWarnings("unchecked")
+ private static <T> T getSingletonExtension(Class<T> extensionType, String extensionName) {
+ return (T) EXTENSION_INSTANCE_CACHE.computeIfAbsent(extensionName, name -> {
+ Class<T> extensionInstanceClass = getExtensionClass(extensionType, extensionName);
+ try {
+ if (extensionInstanceClass == null) {
+ return null;
+ }
+ T extensionInstance = extensionInstanceClass.newInstance();
+ logger.info("initialize extension instance success, extensionType: {}, extensionName: {}",
+ extensionType, extensionName);
+ return extensionInstance;
+ } catch (InstantiationException | IllegalAccessException e) {
+ throw new ExtensionException("Extension initialize error", e);
+ }
+ });
+ }
+
+ private static <T> T getPrototypeExtension(Class<T> extensionType, String extensionName) {
+ Class<T> extensionInstanceClass = getExtensionClass(extensionType, extensionName);
+ try {
+ if (extensionInstanceClass == null) {
+ return null;
+ }
+ T extensionInstance = extensionInstanceClass.newInstance();
+ logger.info("initialize extension instance success, extensionType: {}, extensionName: {}",
+ extensionType, extensionName);
+ return extensionInstance;
+ } catch (InstantiationException | IllegalAccessException e) {
+ throw new ExtensionException("Extension initialize error", e);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private static <T> Class<T> getExtensionClass(Class<T> extensionType, String extensionName) {
+ for (ExtensionClassLoader extensionClassLoader : extensionClassLoaders) {
+ Map<String, Class<?>> extensionClassMap = extensionClassLoader.loadExtensionClass(extensionType);
+ Class<?> instanceClass = extensionClassMap.get(extensionName);
+ if (instanceClass != null) {
+ return (Class<T>) instanceClass;
+ }
+ }
+ return null;
+ }
+
}
diff --git a/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshExtensionLoader.java b/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshExtensionLoader.java
deleted file mode 100644
index 9531bc9..0000000
--- a/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshExtensionLoader.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eventmesh.spi;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URL;
-import java.util.Enumeration;
-import java.util.Properties;
-import java.util.concurrent.ConcurrentHashMap;
-
-public enum EventMeshExtensionLoader {
- ;
-
- private static final Logger logger = LoggerFactory.getLogger(EventMeshExtensionLoader.class);
-
- private static final ConcurrentHashMap<Class<?>, ConcurrentHashMap<String, Class<?>>> EXTENSION_CLASS_LOAD_CACHE = new ConcurrentHashMap<>(16);
-
- private static final ConcurrentHashMap<String, Object> EXTENSION_INSTANCE_CACHE = new ConcurrentHashMap<>(16);
-
- private static final String EVENTMESH_EXTENSION_DIR = "META-INF/eventmesh/";
-
- @SuppressWarnings("unchecked")
- public static <T> T getExtension(Class<T> extensionType, String extensionName) {
- if (!hasLoadExtensionClass(extensionType)) {
- loadExtensionClass(extensionType);
- }
- if (!hasInitializeExtension(extensionName)) {
- T instance = initializeExtension(extensionType, extensionName);
- EventMeshSPI spiAnnotation = extensionType.getAnnotation(EventMeshSPI.class);
- if (!spiAnnotation.isSingleton()) {
- return instance;
- }
- EXTENSION_INSTANCE_CACHE.put(extensionName, instance);
- }
- return (T) EXTENSION_INSTANCE_CACHE.get(extensionName);
- }
-
- @SuppressWarnings("unchecked")
- private static <T> T initializeExtension(Class<T> extensionType, String extensionName) {
- ConcurrentHashMap<String, Class<?>> extensionClassMap = EXTENSION_CLASS_LOAD_CACHE.get(extensionType);
- if (extensionClassMap == null) {
- throw new ExtensionException(String.format("Extension type:%s has not been loaded", extensionType));
- }
- if (!extensionClassMap.containsKey(extensionName)) {
- throw new ExtensionException(String.format("Extension name:%s has not been loaded", extensionName));
- }
- Class<?> aClass = extensionClassMap.get(extensionName);
- try {
- Object extensionObj = aClass.newInstance();
- logger.info("initialize extension instance success, extensionType: {}, extensionName: {}", extensionType, extensionName);
- return (T) extensionObj;
- } catch (InstantiationException | IllegalAccessException e) {
- throw new ExtensionException("Extension initialize error", e);
- }
- }
-
- public static <T> void loadExtensionClass(Class<T> extensionType) {
- String extensionFileName = EVENTMESH_EXTENSION_DIR + extensionType.getName();
- ClassLoader classLoader = EventMeshExtensionLoader.class.getClassLoader();
- try {
- Enumeration<URL> extensionUrls = classLoader.getResources(extensionFileName);
- if (extensionUrls != null) {
- while (extensionUrls.hasMoreElements()) {
- URL url = extensionUrls.nextElement();
- loadResources(url, extensionType);
- }
- }
- } catch (IOException e) {
- throw new ExtensionException("load extension class error", e);
- }
-
-
- }
-
- private static <T> void loadResources(URL url, Class<T> extensionType) throws IOException {
- try (InputStream inputStream = url.openStream()) {
- Properties properties = new Properties();
- properties.load(inputStream);
- properties.forEach((extensionName, extensionClass) -> {
- String extensionNameStr = (String) extensionName;
- String extensionClassStr = (String) extensionClass;
- try {
- Class<?> targetClass = Class.forName(extensionClassStr);
- logger.info("load extension class success, extensionType: {}, extensionClass: {}", extensionType, targetClass);
- if (!extensionType.isAssignableFrom(targetClass)) {
- throw new ExtensionException(
- String.format("class: %s is not subClass of %s", targetClass, extensionType));
- }
- EXTENSION_CLASS_LOAD_CACHE.computeIfAbsent(extensionType, k -> new ConcurrentHashMap<>())
- .put(extensionNameStr, targetClass);
- } catch (ClassNotFoundException e) {
- throw new ExtensionException("load extension class error", e);
- }
- });
- }
- }
-
- private static <T> boolean hasLoadExtensionClass(Class<T> extensionType) {
- return EXTENSION_CLASS_LOAD_CACHE.containsKey(extensionType);
- }
-
- private static boolean hasInitializeExtension(String extensionName) {
- return EXTENSION_INSTANCE_CACHE.containsKey(extensionName);
- }
-}
diff --git a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/domain/RocketMQConstants.java b/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/loader/ExtensionClassLoader.java
similarity index 62%
rename from eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/domain/RocketMQConstants.java
rename to eventmesh-spi/src/main/java/org/apache/eventmesh/spi/loader/ExtensionClassLoader.java
index 063f0f3..feca359 100644
--- a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/domain/RocketMQConstants.java
+++ b/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/loader/ExtensionClassLoader.java
@@ -15,13 +15,25 @@
* limitations under the License.
*/
-package org.apache.eventmesh.connector.rocketmq.domain;
+package org.apache.eventmesh.spi.loader;
-public interface RocketMQConstants {
+import java.util.Map;
+
+/**
+ * Load extension class
+ * <ul>
+ * <li>{@link MetaInfExtensionClassLoader}</li>
+ * <li>{@link JarExtensionClassLoader}</li>
+ * </ul>
+ */
+public interface ExtensionClassLoader {
/**
- * Key of scheduled message delivery time
+ * load
+ *
+ * @param extensionType extension type class
+ * @param <T> extension type
+ * @return extension instance name to extension instance class
*/
- String START_DELIVER_TIME = "__STARTDELIVERTIME";
-
+ <T> Map<String, Class<?>> loadExtensionClass(Class<T> extensionType);
}
diff --git a/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/loader/JarExtensionClassLoader.java b/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/loader/JarExtensionClassLoader.java
new file mode 100644
index 0000000..7d591a8
--- /dev/null
+++ b/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/loader/JarExtensionClassLoader.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.spi.loader;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.eventmesh.spi.ExtensionException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Load extension from '${eventMeshPluginDir}', the default loading directory is './dist/plugin'
+ */
+public class JarExtensionClassLoader implements ExtensionClassLoader {
+
+ private static final Logger logger = LoggerFactory.getLogger(JarExtensionClassLoader.class);
+
+ private static final ConcurrentHashMap<Class<?>, Map<String, Class<?>>> EXTENSION_CLASS_CACHE =
+ new ConcurrentHashMap<>(16);
+
+ private static final String EVENTMESH_EXTENSION_PLUGIN_DIR = System.getProperty("eventMeshPluginDir",
+ "./dist/plugin");
+
+ private static final String EVENTMESH_EXTENSION_META_DIR = "META-INF/eventmesh/";
+
+ @Override
+ public <T> Map<String, Class<?>> loadExtensionClass(Class<T> extensionType) {
+ return EXTENSION_CLASS_CACHE.computeIfAbsent(extensionType, this::doLoadExtensionClass);
+ }
+
+ private <T> Map<String, Class<?>> doLoadExtensionClass(Class<T> extensionType) {
+ Map<String, Class<?>> extensionMap = new HashMap<>();
+
+ List<URL> pluginJarPaths = loadJarPathFromResource(EVENTMESH_EXTENSION_PLUGIN_DIR);
+ if (CollectionUtils.isEmpty(pluginJarPaths)) {
+ return extensionMap;
+ }
+
+ String extensionFileName = EVENTMESH_EXTENSION_META_DIR + extensionType.getName();
+ URLClassLoader urlClassLoader = URLClassLoader.newInstance(pluginJarPaths.toArray(new URL[0]));
+ try {
+ Enumeration<URL> extensionUrls = urlClassLoader.getResources(extensionFileName);
+ if (extensionUrls != null) {
+ while (extensionUrls.hasMoreElements()) {
+ URL url = extensionUrls.nextElement();
+ extensionMap.putAll(loadResources(urlClassLoader, url, extensionType));
+ }
+ }
+ } catch (IOException e) {
+ throw new ExtensionException("load extension class error", e);
+ }
+ return extensionMap;
+ }
+
+ private List<URL> loadJarPathFromResource(String pluginPath) {
+ File plugin = new File(pluginPath);
+ if (!plugin.exists()) {
+ logger.warn("plugin dir:{} is not exist", pluginPath);
+ return Lists.newArrayList();
+ }
+ if (plugin.isFile() && plugin.getName().endsWith(".jar")) {
+ try {
+ return Lists.newArrayList(plugin.toURI().toURL());
+ } catch (Exception e) {
+ throw new ExtensionException(e);
+ }
+ }
+ File[] files = plugin.listFiles();
+ List<URL> pluginUrls = new ArrayList<>();
+ if (files != null) {
+ for (File file : files) {
+ pluginUrls.addAll(loadJarPathFromResource(file.getPath()));
+ }
+ }
+ return pluginUrls;
+ }
+
+ private static <T> Map<String, Class<?>> loadResources(URLClassLoader urlClassLoader, URL url, Class<T> extensionType) throws IOException {
+ Map<String, Class<?>> extensionMap = new HashMap<>();
+ try (InputStream inputStream = url.openStream()) {
+ Properties properties = new Properties();
+ properties.load(inputStream);
+ properties.forEach((extensionName, extensionClass) -> {
+ String extensionNameStr = (String) extensionName;
+ String extensionClassStr = (String) extensionClass;
+ try {
+ Class<?> targetClass = urlClassLoader.loadClass(extensionClassStr);
+ logger.info("load extension class success, extensionType: {}, extensionClass: {}",
+ extensionType, targetClass);
+ if (!extensionType.isAssignableFrom(targetClass)) {
+ throw new ExtensionException(
+ String.format("class: %s is not subClass of %s", targetClass, extensionType));
+ }
+ extensionMap.put(extensionNameStr, targetClass);
+ } catch (ClassNotFoundException e) {
+ throw new ExtensionException("load extension class error", e);
+ }
+ });
+ }
+ return extensionMap;
+ }
+}
diff --git a/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/loader/MetaInfExtensionClassLoader.java b/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/loader/MetaInfExtensionClassLoader.java
new file mode 100644
index 0000000..59d45e1
--- /dev/null
+++ b/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/loader/MetaInfExtensionClassLoader.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.spi.loader;
+
+import org.apache.eventmesh.spi.ExtensionException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Load extension from classpath
+ */
+public class MetaInfExtensionClassLoader implements ExtensionClassLoader {
+
+ private static final Logger logger = LoggerFactory.getLogger(MetaInfExtensionClassLoader.class);
+
+ private static final ConcurrentHashMap<Class<?>, Map<String, Class<?>>> EXTENSION_CLASS_CACHE =
+ new ConcurrentHashMap<>(16);
+
+ private static final String EVENTMESH_EXTENSION_META_DIR = "META-INF/eventmesh/";
+
+ @Override
+ public <T> Map<String, Class<?>> loadExtensionClass(Class<T> extensionType) {
+ return EXTENSION_CLASS_CACHE.computeIfAbsent(extensionType, this::doLoadExtensionClass);
+ }
+
+ private <T> Map<String, Class<?>> doLoadExtensionClass(Class<T> extensionType) {
+ Map<String, Class<?>> extensionMap = new HashMap<>();
+ String extensionFileName = EVENTMESH_EXTENSION_META_DIR + extensionType.getName();
+ ClassLoader classLoader = MetaInfExtensionClassLoader.class.getClassLoader();
+ try {
+ Enumeration<URL> extensionUrls = classLoader.getResources(extensionFileName);
+ if (extensionUrls != null) {
+ while (extensionUrls.hasMoreElements()) {
+ URL url = extensionUrls.nextElement();
+ extensionMap.putAll(loadResources(url, extensionType));
+ }
+ }
+ } catch (IOException e) {
+ throw new ExtensionException("load extension class error", e);
+ }
+ return extensionMap;
+ }
+
+ private static <T> Map<String, Class<?>> loadResources(URL url, Class<T> extensionType) throws IOException {
+ Map<String, Class<?>> extensionMap = new HashMap<>();
+ try (InputStream inputStream = url.openStream()) {
+ Properties properties = new Properties();
+ properties.load(inputStream);
+ properties.forEach((extensionName, extensionClass) -> {
+ String extensionNameStr = (String) extensionName;
+ String extensionClassStr = (String) extensionClass;
+ try {
+ Class<?> targetClass = Class.forName(extensionClassStr);
+ logger.info("load extension class success, extensionType: {}, extensionClass: {}",
+ extensionType, targetClass);
+ if (!extensionType.isAssignableFrom(targetClass)) {
+ throw new ExtensionException(
+ String.format("class: %s is not subClass of %s", targetClass, extensionType));
+ }
+ extensionMap.put(extensionNameStr, targetClass);
+ } catch (ClassNotFoundException e) {
+ throw new ExtensionException("load extension class error", e);
+ }
+ });
+ }
+ return extensionMap;
+ }
+}
diff --git a/eventmesh-starter/build.gradle b/eventmesh-starter/build.gradle
index 7b81c6b..48aafcb 100644
--- a/eventmesh-starter/build.gradle
+++ b/eventmesh-starter/build.gradle
@@ -16,7 +16,7 @@
*/
dependencies {
- implementation project(":eventmesh-runtime"), project(":eventmesh-connector-rocketmq")
- testImplementation project(":eventmesh-runtime"), project(":eventmesh-connector-rocketmq")
+ implementation project(":eventmesh-runtime")
+ testImplementation project(":eventmesh-runtime")
//testImplementation group: 'junit', name: 'junit', version: '4.12'
}
\ No newline at end of file
diff --git a/eventmesh-test/build.gradle b/eventmesh-test/build.gradle
index 567374f..8844c4c 100644
--- a/eventmesh-test/build.gradle
+++ b/eventmesh-test/build.gradle
@@ -25,8 +25,12 @@ List spring_framework = [
dependencies {
// compile log4j2, sl4j
// testCompile log4j2, sl4j
- implementation project(":eventmesh-sdk-java"), project(":eventmesh-connector-api"), project(":eventmesh-common"), spring_framework
- testImplementation project(":eventmesh-sdk-java"), project(":eventmesh-connector-api"), project(":eventmesh-common"), spring_framework
+ implementation project(":eventmesh-sdk-java")
+ implementation project(":eventmesh-connector-plugin:eventmesh-connector-api")
+ implementation spring_framework
+ testImplementation project(":eventmesh-sdk-java")
+ testImplementation project(":eventmesh-connector-plugin:eventmesh-connector-api")
+ testImplementation spring_framework
}
configurations.all {
diff --git a/install.sh b/install.sh
index 19397ac..7acf43d 100644
--- a/install.sh
+++ b/install.sh
@@ -26,7 +26,7 @@
# jar : produce jar
# package tar.gz/zip
-gradle clean -Pdev=true -Pjdk=1.7 dist tar zip
+gradle clean -Pdev=true -Pjdk=1.8 dist tar zip
# package jar
-gradle clean -Pdev=true -Pjdk=1.7 jar
\ No newline at end of file
+gradle clean -Pdev=true -Pjdk=1.8 jar
\ No newline at end of file
diff --git a/settings.gradle b/settings.gradle
index 2b5e0af..7c0e70a 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -18,11 +18,11 @@
rootProject.name = 'EventMesh'
String jdkVersion = "${jdk}"
include 'eventmesh-runtime'
-include 'eventmesh-connector-rocketmq'
include 'eventmesh-sdk-java'
include 'eventmesh-common'
-include 'eventmesh-connector-api'
include 'eventmesh-starter'
include 'eventmesh-test'
include 'eventmesh-spi'
+include 'eventmesh-connector-plugin:eventmesh-connector-api'
+include 'eventmesh-connector-plugin:eventmesh-connector-rocketmq'
diff --git a/style/checkStyle-suppressions.xml b/style/checkStyle-suppressions.xml
new file mode 100644
index 0000000..fd4a42f
--- /dev/null
+++ b/style/checkStyle-suppressions.xml
@@ -0,0 +1,25 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ ~
+ -->
+<!DOCTYPE suppressions PUBLIC
+ "-//Puppy Crawl//DTD Suppressions 1.0//EN"
+ "http://www.puppycrawl.com/dtds/suppressions_1_0.dtd">
+
+<suppressions>
+
+</suppressions>
\ No newline at end of file
diff --git a/style/checkStyle.xml b/style/checkStyle.xml
index d8ff7bd..dc98c99 100644
--- a/style/checkStyle.xml
+++ b/style/checkStyle.xml
@@ -21,6 +21,12 @@
"-//Puppy Crawl//DTD Check Configuration 1.3//EN"
"http://checkstyle.org/dtds/configuration_1_3.dtd">
<module name="Checker">
+
+ <module name="SuppressionFilter">
+ <property name="file" value="./style/checkStyle-suppressions.xml"/>
+ <property name="optional" value="false"/>
+ </module>
+
<module name="RegexpSingleline">
<property name="format" value="System\..+\.println"/>
<property name="message" value="Prohibit invoking System.*.println in source code !"/>
@@ -44,4 +50,9 @@
<property name="format" value="^(org)\.apache(\.[a-zA-Z][a-zA-Z0-9]*)+$"/>
</module>
</module>
+
+ <module name="BeforeExecutionExclusionFileFilter">
+ <property name="fileNamePattern" value="./eventmesh-runtime/conf/sChat2.jks$"/>
+ </module>
+
</module>
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org