You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by ch...@apache.org on 2021/07/20 12:32:58 UTC

[incubator-eventmesh] branch develop updated: [ISSUE #367] Refactor connector (#433)

This is an automated email from the ASF dual-hosted git repository.

chenguangsheng pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git


The following commit(s) were added to refs/heads/develop by this push:
     new 7850620  [ISSUE #367] Refactor connector (#433)
7850620 is described below

commit 7850620da2909b4b9b47916aebc355bcc3549502
Author: Wenjun Ruan <86...@qq.com>
AuthorDate: Tue Jul 20 20:32:49 2021 +0800

    [ISSUE #367] Refactor connector (#433)
    
    * [ISSUE #367] Refactor connector
        1.support load plugin from eventMeshPluginDir
        2.remove connector plugin from runtime
    
    * Add docs of plugin
    
    * Remove connector-api in connector-plugin
---
 build.gradle                                       |  48 +-------
 .../eventmesh-runtime-quickstart.zh-CN.md          |  14 ++-
 .../instructions/eventmesh-runtime-quickstart.md   |  16 ++-
 docs/images/project-structure.png                  | Bin 54777 -> 141844 bytes
 .../build.gradle                                   |  37 +++---
 .../eventmesh-connector-api}/build.gradle          |   2 +-
 .../eventmesh-connector-api}/gradle.properties     |   0
 .../org/apache/eventmesh/api/AbstractContext.java  |   0
 .../org/apache/eventmesh/api/EventMeshAction.java  |   0
 .../api/EventMeshAsyncConsumeContext.java          |   0
 .../java/org/apache/eventmesh/api/RRCallback.java  |   0
 .../eventmesh/api/consumer/MeshMQPushConsumer.java |   0
 .../api/factory/ConnectorPluginFactory.java        |  19 ++-
 .../eventmesh/api/producer/MeshMQProducer.java     |   0
 .../eventmesh-connector-rocketmq}/build.gradle     |   4 +-
 .../gradle.properties                              |   0
 .../rocketmq/MessagingAccessPointImpl.java         |   0
 .../ConsumeMessageConcurrentlyService.java         |   5 +-
 .../connector/rocketmq/common/Constants.java       |   0
 .../rocketmq/common/EventMeshConstants.java        |   0
 .../connector/rocketmq/config/ClientConfig.java    |   0
 .../rocketmq/config/ClientConfiguration.java       |   0
 .../rocketmq/config/ConfigurationWrapper.java      |   2 +-
 .../rocketmq/consumer/PushConsumerImpl.java        |   0
 .../rocketmq/consumer/RocketMQConsumerImpl.java    |   2 +-
 .../connector/rocketmq/domain/ConsumeRequest.java  |   0
 .../connector/rocketmq/domain/NonStandardKeys.java |   0
 .../rocketmq/domain/RocketMQConstants.java         |   0
 .../patch/EventMeshConsumeConcurrentlyContext.java |   0
 .../patch/EventMeshConsumeConcurrentlyStatus.java  |   0
 .../EventMeshMessageListenerConcurrently.java      |   0
 .../rocketmq/producer/AbstractOMSProducer.java     |   0
 .../connector/rocketmq/producer/ProducerImpl.java  |   0
 .../rocketmq/producer/RocketMQProducerImpl.java    |   0
 .../connector/rocketmq/promise/DefaultPromise.java |   0
 .../connector/rocketmq/promise/FutureState.java    |   0
 .../connector/rocketmq/utils/BeanUtils.java        |   0
 .../connector/rocketmq/utils/OMSUtil.java          |   0
 ...pache.eventmesh.api.consumer.MeshMQPushConsumer |   0
 ...rg.apache.eventmesh.api.producer.MeshMQProducer |   0
 .../rocketmq/consumer/PushConsumerImplTest.java    |   1 -
 .../apache/rocketmq/producer/ProducerImplTest.java |   0
 .../rocketmq/promise/DefaultPromiseTest.java       |   0
 .../org/apache/rocketmq/utils/BeanUtilsTest.java   |   0
 ...rg.apache.io.openmessaging.MessagingAccessPoint |   0
 .../org.apache.io.openmessaging.producer.Producer  |   0
 .../gradle.properties                              |   0
 ...pache.eventmesh.api.consumer.MeshMQPushConsumer |  16 ---
 ...rg.apache.eventmesh.api.producer.MeshMQProducer |  16 ---
 eventmesh-runtime/build.gradle                     |   4 +-
 .../runtime/core/plugin/MQConsumerWrapper.java     |  17 +--
 .../runtime/core/plugin/MQProducerWrapper.java     |  18 +--
 .../protocol/http/consumer/HandleMsgContext.java   |   1 -
 .../tcp/client/group/ClientGroupWrapper.java       |   1 -
 .../client/session/push/DownStreamMsgContext.java  |   1 -
 .../eventmesh/spi/EventMeshExtensionFactory.java   |  83 ++++++++++++-
 .../eventmesh/spi/EventMeshExtensionLoader.java    | 124 --------------------
 .../eventmesh/spi/loader/ExtensionClassLoader.java |  22 +++-
 .../spi/loader/JarExtensionClassLoader.java        | 130 +++++++++++++++++++++
 .../spi/loader/MetaInfExtensionClassLoader.java    |  92 +++++++++++++++
 eventmesh-starter/build.gradle                     |   4 +-
 eventmesh-test/build.gradle                        |   8 +-
 install.sh                                         |   4 +-
 settings.gradle                                    |   4 +-
 style/checkStyle-suppressions.xml                  |  25 ++++
 style/checkStyle.xml                               |  11 ++
 66 files changed, 441 insertions(+), 290 deletions(-)

diff --git a/build.gradle b/build.gradle
index 1ff8edf..1658b84 100644
--- a/build.gradle
+++ b/build.gradle
@@ -188,45 +188,6 @@ subprojects {
         }
     }
 
-//    checkstyle {
-//        toolVersion = "8.32"
-//        ignoreFailures = true
-//        sourceSets = [sourceSets.main]
-//        configFile = '../style/codeStyle.xml' as File
-//        showViolations true
-//    }
-//
-//    tasks.withType(Checkstyle) {
-//        reports {
-//            xml.enabled false
-//            html.enabled true
-//        }
-//    }
-//
-//    sourceSets {
-//        main {
-//            java {
-//                srcDir 'src/main/java'
-//            }
-//
-//            resources {
-//                srcDir 'src/main/resources'
-//            }
-//
-//        }
-//
-//        test {
-//            java {
-//                srcDir 'src/test/java'
-//            }
-//
-//            resources {
-//                srcDir 'src/test/resources'
-//            }
-//
-//        }
-//    }
-
     spotbugs {
         //toolVersion = '4.2.3'
         ignoreFailures = true
@@ -271,14 +232,6 @@ subprojects {
         }
     }
 
-//        tasks.withType(Pmd) {
-//            reports {
-//                xml.enabled = false
-//                html.enabled = true
-//            }
-//        }
-
-
     pmd {
         consoleOutput = true
         toolVersion = "6.23.0"
@@ -314,6 +267,7 @@ subprojects {
                 from project.jar.getArchivePath()
                 exclude 'eventmesh-common*.jar'
                 exclude 'eventmesh-connector-api*.jar'
+                exclude 'eventmesh-connector-plugin*.jar'
                 exclude 'eventmesh-starter*.jar'
                 exclude 'eventmesh-test*.jar'
                 exclude 'eventmesh-sdk*.jar'
diff --git a/docs/cn/instructions/eventmesh-runtime-quickstart.zh-CN.md b/docs/cn/instructions/eventmesh-runtime-quickstart.zh-CN.md
index 6ae41fa..1bc1aa8 100644
--- a/docs/cn/instructions/eventmesh-runtime-quickstart.zh-CN.md
+++ b/docs/cn/instructions/eventmesh-runtime-quickstart.zh-CN.md
@@ -20,7 +20,7 @@ Gradle至少为7.0, 推荐 7.0.*
 ```$ xslt
 unzip EventMesh-master.zip
 cd / *您的部署路径* /EventMesh-master
-gradle clean dist tar -x test
+gradle clean dist copyConnectorPlugin tar -x test
 ```
 
 您将在目录/ *您的部署路径* /EventMesh-master/eventmesh-runtime/dist中获得**eventmesh-runtime_1.0.0.tar.gz**
@@ -57,14 +57,20 @@ sh start.sh
 ![project-structure](../../images/project-structure.png)
 
 - eventmesh-common : eventmesh公共类与方法模块
-- eventmesh-connector-api : eventmesh插件接口定义模块
-- eventmesh-connector-rocketmq : eventmesh rocketmq插件模块
+- eventmesh-connector-api : eventmesh connector插件接口定义模块
+- eventmesh-connector-plugin : eventmesh connector插件模块
 - eventmesh-runtime : eventmesh运行时模块
 - eventmesh-sdk-java : eventmesh java客户端sdk
 - eventmesh-starter : eventmesh本地启动运行项目入口
 - eventmesh-spi : eventmesh SPI加载模块
 
-> 注:插件模块遵循eventmesh定义的spi机制,需要在对应模块中的/main/resources/META-INF/eventmesh 下配置相关接口与实现类的映射文件
+> 注:插件模块遵循eventmesh定义的SPI规范, 自定义的SPI接口需要使用注解@EventMeshSPI标识.
+> 插件实例需要在对应模块中的/main/resources/META-INF/eventmesh 下配置相关接口与实现类的映射文件,文件名为SPI接口全类名. 
+> 文件内容为插件实例名到插件实例的映射, 具体可以参考eventmesh-connector-rocketmq插件模块
+
+插件可以从classpath和插件目录下面加载. 在本地开发阶段可以将使用的插件在eventmesh-starter模块build.gradle中进行声明,或者执行gradle的copyConnectorPlugin任务
+将插件拷贝至dist/plugin目录下, eventmesh默认会加载项目下dist/plugin目录下的插件, 加载目录可以通过-DeventMeshPluginDir=your_plugin_directory来改变插件目录.
+运行时需要使用的插件实例可以在eventmesh.properties中进行配置
 
 **2.3.2 配置插件**
 
diff --git a/docs/en/instructions/eventmesh-runtime-quickstart.md b/docs/en/instructions/eventmesh-runtime-quickstart.md
index 62b315c..d47d5d3 100644
--- a/docs/en/instructions/eventmesh-runtime-quickstart.md
+++ b/docs/en/instructions/eventmesh-runtime-quickstart.md
@@ -20,7 +20,7 @@ You will get **EventMesh-master.zip**
 ```$xslt
 unzip EventMesh-master.zip
 cd /*YOUR DEPLOY PATH*/EventMesh-master
-gradle clean dist tar -x test
+gradle clean dist copyConnectorPlugin tar -x test
 ```
 
 You will get **EventMesh_1.2.0.tar.gz** in directory /* YOUR DEPLOY PATH */EventMesh-master/build
@@ -58,14 +58,22 @@ Same with 1.2
 
 - eventmesh-common : eventmesh common classes and method module
 - eventmesh-connector-api : eventmesh connector api definition module
-- eventmesh-connector-rocketmq : eventmesh rocketmq connector module
+- eventmesh-connector-plugin : eventmesh connector plugin instance module
 - eventmesh-runtime : eventmesh runtime module
 - eventmesh-sdk-java : eventmesh java client sdk
 - eventmesh-starter : eventmesh project local start entry
 - eventmesh-spi : eventmesh SPI load module
 
-> ps: The loading of connector plugin follows the eventmesh SPI mechanism, it's necessary to configure the mapping file of
-related interface and implementation class under /main/resources/meta-inf/eventmesh in the corresponding module
+> ps: The plugin module follows the eventmesh SPI specification, custom SPI interface need to be identified with the @EventMeshSPI annotation.
+> The plugin instance needs to be configured in corresponding module under /main/resources/meta-inf/eventmesh with the mapping file of
+> related interface and implementation class. The content of the file is a mapping of plugin instance name to plugin instance, you can find more
+> detail in eventmesh-connector-rocketmq module
+
+The plugin can be loaded from classpath and plugin directory. In local develop, you can declare the used plugins in build.gradle of eventmesh-starter module,
+or execute copyConnectorPlugin task of gradle to copy the plugin instance jar to dist/plugin directory. By default, eventmesh will load the plugins in project's
+dist/plugin, this can be changed by add -DeventMeshPluginDir=your_plugin_directory.
+The plugin instance need to be used at runtime can be configured in eventmesh.properties.
+
 
 **2.3.2 Configure plugin**
 
diff --git a/docs/images/project-structure.png b/docs/images/project-structure.png
index efc5249..2305a4e 100644
Binary files a/docs/images/project-structure.png and b/docs/images/project-structure.png differ
diff --git a/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/EventMeshAsyncConsumeContext.java b/eventmesh-connector-plugin/build.gradle
similarity index 54%
copy from eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/EventMeshAsyncConsumeContext.java
copy to eventmesh-connector-plugin/build.gradle
index c7e4e7f..be1a227 100644
--- a/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/EventMeshAsyncConsumeContext.java
+++ b/eventmesh-connector-plugin/build.gradle
@@ -14,27 +14,24 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.eventmesh.api;
 
-import io.openmessaging.api.Action;
-import io.openmessaging.api.AsyncConsumeContext;
-
-public abstract class EventMeshAsyncConsumeContext extends AsyncConsumeContext {
-
-    private AbstractContext abstractContext;
-
-    public AbstractContext getAbstractContext() {
-        return abstractContext;
+task copyConnectorPlugin(dependsOn: ['jar']) {
+    doFirst {
+        new File(projectDir, '../eventmesh-connector-plugin/dist/apps').mkdir()
+        new File(projectDir, '../dist/plugin/connector').mkdirs()
     }
-
-    public void setAbstractContext(AbstractContext abstractContext) {
-        this.abstractContext = abstractContext;
-    }
-
-    public abstract void commit(EventMeshAction action);
-
-    @Override
-    public void commit(Action action) {
-        throw new UnsupportedOperationException("not support yet");
+    doLast {
+        copy {
+            into('../eventmesh-connector-plugin/dist/apps/')
+            from project.jar.getArchivePath()
+            exclude {
+                "eventmesh-connector-plugin-${version}.jar"
+                "eventmesh-connector-api-${version}.jar"
+            }
+        }
+        copy {
+            into '../dist/plugin/connector'
+            from "../eventmesh-connector-plugin/dist/apps/eventmesh-connector-rocketmq-${version}.jar"
+        }
     }
 }
\ No newline at end of file
diff --git a/eventmesh-connector-api/build.gradle b/eventmesh-connector-plugin/eventmesh-connector-api/build.gradle
similarity index 91%
rename from eventmesh-connector-api/build.gradle
rename to eventmesh-connector-plugin/eventmesh-connector-api/build.gradle
index 157048e..5389a80 100644
--- a/eventmesh-connector-api/build.gradle
+++ b/eventmesh-connector-plugin/eventmesh-connector-api/build.gradle
@@ -20,6 +20,6 @@ List open_message = [
 ]
 
 dependencies {
-    implementation open_message,project(":eventmesh-common"), project(":eventmesh-spi")
+    api open_message, project(":eventmesh-common"), project(":eventmesh-spi")
     testImplementation open_message,project(":eventmesh-common"), project(":eventmesh-spi")
 }
diff --git a/eventmesh-connector-api/gradle.properties b/eventmesh-connector-plugin/eventmesh-connector-api/gradle.properties
similarity index 100%
copy from eventmesh-connector-api/gradle.properties
copy to eventmesh-connector-plugin/eventmesh-connector-api/gradle.properties
diff --git a/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/AbstractContext.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/AbstractContext.java
similarity index 100%
rename from eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/AbstractContext.java
rename to eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/AbstractContext.java
diff --git a/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/EventMeshAction.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/EventMeshAction.java
similarity index 100%
rename from eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/EventMeshAction.java
rename to eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/EventMeshAction.java
diff --git a/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/EventMeshAsyncConsumeContext.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/EventMeshAsyncConsumeContext.java
similarity index 100%
rename from eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/EventMeshAsyncConsumeContext.java
rename to eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/EventMeshAsyncConsumeContext.java
diff --git a/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/RRCallback.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/RRCallback.java
similarity index 100%
rename from eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/RRCallback.java
rename to eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/RRCallback.java
diff --git a/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/MeshMQPushConsumer.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/MeshMQPushConsumer.java
similarity index 100%
rename from eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/MeshMQPushConsumer.java
rename to eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/MeshMQPushConsumer.java
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/PluginFactory.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/factory/ConnectorPluginFactory.java
similarity index 75%
rename from eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/PluginFactory.java
rename to eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/factory/ConnectorPluginFactory.java
index b114953..cff0c8e 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/PluginFactory.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/factory/ConnectorPluginFactory.java
@@ -17,18 +17,33 @@
  * under the License.
  */
 
-package org.apache.eventmesh.runtime.core.plugin;
+package org.apache.eventmesh.api.factory;
 
 import org.apache.eventmesh.api.consumer.MeshMQPushConsumer;
 import org.apache.eventmesh.api.producer.MeshMQProducer;
 import org.apache.eventmesh.spi.EventMeshExtensionFactory;
 
-public class PluginFactory {
+/**
+ * The factory to get connector {@link MeshMQProducer} and {@link MeshMQPushConsumer}
+ */
+public class ConnectorPluginFactory {
 
+    /**
+     * Get MeshMQProducer instance by plugin name
+     *
+     * @param connectorPluginName plugin name
+     * @return MeshMQProducer instance
+     */
     public static MeshMQProducer getMeshMQProducer(String connectorPluginName) {
         return EventMeshExtensionFactory.getExtension(MeshMQProducer.class, connectorPluginName);
     }
 
+    /**
+     * Get MeshMQPushConsumer instance by plugin name
+     *
+     * @param connectorPluginName plugin name
+     * @return MeshMQPushConsumer instance
+     */
     public static MeshMQPushConsumer getMeshMQPushConsumer(String connectorPluginName) {
         return EventMeshExtensionFactory.getExtension(MeshMQPushConsumer.class, connectorPluginName);
     }
diff --git a/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/MeshMQProducer.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/MeshMQProducer.java
similarity index 100%
rename from eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/MeshMQProducer.java
rename to eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/MeshMQProducer.java
diff --git a/eventmesh-connector-rocketmq/build.gradle b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/build.gradle
similarity index 90%
rename from eventmesh-connector-rocketmq/build.gradle
rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/build.gradle
index 6126b50..03830a2 100644
--- a/eventmesh-connector-rocketmq/build.gradle
+++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/build.gradle
@@ -55,6 +55,6 @@ List open_message = [
 ]
 
 dependencies {
-    implementation rocketmq, metrics,open_message,project(":eventmesh-connector-api"),project(":eventmesh-common")
-    testImplementation rocketmq, metrics,open_message,project(":eventmesh-connector-api"),project(":eventmesh-common")
+    api rocketmq, metrics, open_message, project(":eventmesh-connector-plugin:eventmesh-connector-api")
+    testImplementation rocketmq, metrics, open_message, project(":eventmesh-connector-plugin:eventmesh-connector-api")
 }
diff --git a/eventmesh-connector-rocketmq/gradle.properties b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/gradle.properties
similarity index 100%
rename from eventmesh-connector-rocketmq/gradle.properties
rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/gradle.properties
diff --git a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/MessagingAccessPointImpl.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/MessagingAccessPointImpl.java
similarity index 100%
rename from eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/MessagingAccessPointImpl.java
rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/MessagingAccessPointImpl.java
diff --git a/eventmesh-connector-rocketmq/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
similarity index 98%
rename from eventmesh-connector-rocketmq/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
index 95c2fc9..7111ca9 100644
--- a/eventmesh-connector-rocketmq/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.client.impl.consumer;
+package org.apache.eventmesh.connector.rocketmq.client.impl.consumer;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -38,6 +38,9 @@ import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
 import org.apache.rocketmq.client.consumer.listener.ConsumeReturnType;
 import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
 import org.apache.rocketmq.client.hook.ConsumeMessageContext;
+import org.apache.rocketmq.client.impl.consumer.ConsumeMessageService;
+import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
+import org.apache.rocketmq.client.impl.consumer.ProcessQueue;
 import org.apache.rocketmq.client.log.ClientLogger;
 import org.apache.rocketmq.client.stat.ConsumerStatsManager;
 import org.apache.rocketmq.common.MixAll;
diff --git a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/common/Constants.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/common/Constants.java
similarity index 100%
rename from eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/common/Constants.java
rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/common/Constants.java
diff --git a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/common/EventMeshConstants.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/common/EventMeshConstants.java
similarity index 100%
rename from eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/common/EventMeshConstants.java
rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/common/EventMeshConstants.java
diff --git a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/config/ClientConfig.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/config/ClientConfig.java
similarity index 100%
rename from eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/config/ClientConfig.java
rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/config/ClientConfig.java
diff --git a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/config/ClientConfiguration.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/config/ClientConfiguration.java
similarity index 100%
rename from eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/config/ClientConfiguration.java
rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/config/ClientConfiguration.java
diff --git a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/config/ConfigurationWrapper.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/config/ConfigurationWrapper.java
similarity index 99%
rename from eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/config/ConfigurationWrapper.java
rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/config/ConfigurationWrapper.java
index b1c2732..9334b5f 100644
--- a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/config/ConfigurationWrapper.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/config/ConfigurationWrapper.java
@@ -73,4 +73,4 @@ public class ConfigurationWrapper {
     public String getProp(String key) {
         return StringUtils.isEmpty(key) ? null : properties.getProperty(key, null);
     }
-}
\ No newline at end of file
+}
diff --git a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/PushConsumerImpl.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/PushConsumerImpl.java
similarity index 100%
rename from eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/PushConsumerImpl.java
rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/PushConsumerImpl.java
diff --git a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/RocketMQConsumerImpl.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/RocketMQConsumerImpl.java
similarity index 98%
rename from eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/RocketMQConsumerImpl.java
rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/RocketMQConsumerImpl.java
index 7087739..9d0d347 100644
--- a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/RocketMQConsumerImpl.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/RocketMQConsumerImpl.java
@@ -40,7 +40,7 @@ import org.apache.eventmesh.connector.rocketmq.config.ClientConfiguration;
 import org.apache.eventmesh.connector.rocketmq.config.ConfigurationWrapper;
 import org.apache.eventmesh.connector.rocketmq.patch.EventMeshConsumeConcurrentlyContext;
 import org.apache.eventmesh.connector.rocketmq.utils.OMSUtil;
-import org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService;
+import org.apache.eventmesh.connector.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService;
 import org.apache.rocketmq.client.impl.consumer.ConsumeMessageService;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
diff --git a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/domain/ConsumeRequest.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/domain/ConsumeRequest.java
similarity index 100%
rename from eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/domain/ConsumeRequest.java
rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/domain/ConsumeRequest.java
diff --git a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/domain/NonStandardKeys.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/domain/NonStandardKeys.java
similarity index 100%
rename from eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/domain/NonStandardKeys.java
rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/domain/NonStandardKeys.java
diff --git a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/domain/RocketMQConstants.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/domain/RocketMQConstants.java
similarity index 100%
copy from eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/domain/RocketMQConstants.java
copy to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/domain/RocketMQConstants.java
diff --git a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/patch/EventMeshConsumeConcurrentlyContext.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/patch/EventMeshConsumeConcurrentlyContext.java
similarity index 100%
rename from eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/patch/EventMeshConsumeConcurrentlyContext.java
rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/patch/EventMeshConsumeConcurrentlyContext.java
diff --git a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/patch/EventMeshConsumeConcurrentlyStatus.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/patch/EventMeshConsumeConcurrentlyStatus.java
similarity index 100%
rename from eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/patch/EventMeshConsumeConcurrentlyStatus.java
rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/patch/EventMeshConsumeConcurrentlyStatus.java
diff --git a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/patch/EventMeshMessageListenerConcurrently.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/patch/EventMeshMessageListenerConcurrently.java
similarity index 100%
rename from eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/patch/EventMeshMessageListenerConcurrently.java
rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/patch/EventMeshMessageListenerConcurrently.java
diff --git a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/AbstractOMSProducer.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/AbstractOMSProducer.java
similarity index 100%
rename from eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/AbstractOMSProducer.java
rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/AbstractOMSProducer.java
diff --git a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/ProducerImpl.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/ProducerImpl.java
similarity index 100%
rename from eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/ProducerImpl.java
rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/ProducerImpl.java
diff --git a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/RocketMQProducerImpl.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/RocketMQProducerImpl.java
similarity index 100%
rename from eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/RocketMQProducerImpl.java
rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/RocketMQProducerImpl.java
diff --git a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/promise/DefaultPromise.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/promise/DefaultPromise.java
similarity index 100%
rename from eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/promise/DefaultPromise.java
rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/promise/DefaultPromise.java
diff --git a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/promise/FutureState.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/promise/FutureState.java
similarity index 100%
rename from eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/promise/FutureState.java
rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/promise/FutureState.java
diff --git a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/utils/BeanUtils.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/utils/BeanUtils.java
similarity index 100%
rename from eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/utils/BeanUtils.java
rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/utils/BeanUtils.java
diff --git a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/utils/OMSUtil.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/utils/OMSUtil.java
similarity index 100%
rename from eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/utils/OMSUtil.java
rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/utils/OMSUtil.java
diff --git a/eventmesh-connector-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.consumer.MeshMQPushConsumer b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.consumer.MeshMQPushConsumer
similarity index 100%
rename from eventmesh-connector-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.consumer.MeshMQPushConsumer
rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.consumer.MeshMQPushConsumer
diff --git a/eventmesh-connector-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.producer.MeshMQProducer b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.producer.MeshMQProducer
similarity index 100%
rename from eventmesh-connector-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.producer.MeshMQProducer
rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.producer.MeshMQProducer
diff --git a/eventmesh-connector-rocketmq/src/test/java/org/apache/rocketmq/consumer/PushConsumerImplTest.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/test/java/org/apache/rocketmq/consumer/PushConsumerImplTest.java
similarity index 99%
rename from eventmesh-connector-rocketmq/src/test/java/org/apache/rocketmq/consumer/PushConsumerImplTest.java
rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/test/java/org/apache/rocketmq/consumer/PushConsumerImplTest.java
index 1140535..86a5ffd 100644
--- a/eventmesh-connector-rocketmq/src/test/java/org/apache/rocketmq/consumer/PushConsumerImplTest.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/test/java/org/apache/rocketmq/consumer/PushConsumerImplTest.java
@@ -23,7 +23,6 @@ import java.lang.reflect.Field;
 import java.util.Collections;
 import java.util.Properties;
 
-import io.openmessaging.api.Action;
 import io.openmessaging.api.AsyncConsumeContext;
 import io.openmessaging.api.AsyncMessageListener;
 import io.openmessaging.api.Consumer;
diff --git a/eventmesh-connector-rocketmq/src/test/java/org/apache/rocketmq/producer/ProducerImplTest.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/test/java/org/apache/rocketmq/producer/ProducerImplTest.java
similarity index 100%
rename from eventmesh-connector-rocketmq/src/test/java/org/apache/rocketmq/producer/ProducerImplTest.java
rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/test/java/org/apache/rocketmq/producer/ProducerImplTest.java
diff --git a/eventmesh-connector-rocketmq/src/test/java/org/apache/rocketmq/promise/DefaultPromiseTest.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/test/java/org/apache/rocketmq/promise/DefaultPromiseTest.java
similarity index 100%
rename from eventmesh-connector-rocketmq/src/test/java/org/apache/rocketmq/promise/DefaultPromiseTest.java
rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/test/java/org/apache/rocketmq/promise/DefaultPromiseTest.java
diff --git a/eventmesh-connector-rocketmq/src/test/java/org/apache/rocketmq/utils/BeanUtilsTest.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/test/java/org/apache/rocketmq/utils/BeanUtilsTest.java
similarity index 100%
rename from eventmesh-connector-rocketmq/src/test/java/org/apache/rocketmq/utils/BeanUtilsTest.java
rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/test/java/org/apache/rocketmq/utils/BeanUtilsTest.java
diff --git a/eventmesh-connector-rocketmq/src/test/resources/META-INF/services/org.apache.io.openmessaging.MessagingAccessPoint b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/test/resources/META-INF/services/org.apache.io.openmessaging.MessagingAccessPoint
similarity index 100%
rename from eventmesh-connector-rocketmq/src/test/resources/META-INF/services/org.apache.io.openmessaging.MessagingAccessPoint
rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/test/resources/META-INF/services/org.apache.io.openmessaging.MessagingAccessPoint
diff --git a/eventmesh-connector-rocketmq/src/test/resources/META-INF/services/org.apache.io.openmessaging.producer.Producer b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/test/resources/META-INF/services/org.apache.io.openmessaging.producer.Producer
similarity index 100%
rename from eventmesh-connector-rocketmq/src/test/resources/META-INF/services/org.apache.io.openmessaging.producer.Producer
rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/test/resources/META-INF/services/org.apache.io.openmessaging.producer.Producer
diff --git a/eventmesh-connector-api/gradle.properties b/eventmesh-connector-plugin/gradle.properties
similarity index 100%
rename from eventmesh-connector-api/gradle.properties
rename to eventmesh-connector-plugin/gradle.properties
diff --git a/eventmesh-connector-rocketmq/src/main/resources/META-INF/services/org.apache.eventmesh.api.consumer.MeshMQPushConsumer b/eventmesh-connector-rocketmq/src/main/resources/META-INF/services/org.apache.eventmesh.api.consumer.MeshMQPushConsumer
deleted file mode 100644
index c98880a..0000000
--- a/eventmesh-connector-rocketmq/src/main/resources/META-INF/services/org.apache.eventmesh.api.consumer.MeshMQPushConsumer
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-org.apache.eventmesh.connector.rocketmq.consumer.RocketMQConsumerImpl
\ No newline at end of file
diff --git a/eventmesh-connector-rocketmq/src/main/resources/META-INF/services/org.apache.eventmesh.api.producer.MeshMQProducer b/eventmesh-connector-rocketmq/src/main/resources/META-INF/services/org.apache.eventmesh.api.producer.MeshMQProducer
deleted file mode 100644
index 28907ca..0000000
--- a/eventmesh-connector-rocketmq/src/main/resources/META-INF/services/org.apache.eventmesh.api.producer.MeshMQProducer
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-org.apache.eventmesh.connector.rocketmq.producer.RocketMQProducerImpl
\ No newline at end of file
diff --git a/eventmesh-runtime/build.gradle b/eventmesh-runtime/build.gradle
index e5bb065..00779a7 100644
--- a/eventmesh-runtime/build.gradle
+++ b/eventmesh-runtime/build.gradle
@@ -31,6 +31,6 @@ List open_message = [
 
 
 dependencies {
-    implementation  metrics, open_message,project(":eventmesh-connector-api"),project(":eventmesh-common"),project(":eventmesh-spi")
-    testImplementation metrics,open_message,project(":eventmesh-common"),project(":eventmesh-connector-api"),project(":eventmesh-spi")
+    implementation metrics, open_message, project(":eventmesh-connector-plugin:eventmesh-connector-api")
+    testImplementation metrics, open_message, project(":eventmesh-connector-plugin:eventmesh-connector-api")
 }
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQConsumerWrapper.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQConsumerWrapper.java
index b09f9ed..68ea786 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQConsumerWrapper.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQConsumerWrapper.java
@@ -19,13 +19,13 @@ package org.apache.eventmesh.runtime.core.plugin;
 
 import java.util.List;
 import java.util.Properties;
-import java.util.ServiceLoader;
 
 import io.openmessaging.api.AsyncMessageListener;
 import io.openmessaging.api.Message;
 
 import org.apache.eventmesh.api.AbstractContext;
 import org.apache.eventmesh.api.consumer.MeshMQPushConsumer;
+import org.apache.eventmesh.api.factory.ConnectorPluginFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,7 +36,7 @@ public class MQConsumerWrapper extends MQWrapper {
     protected MeshMQPushConsumer meshMQPushConsumer;
 
     public MQConsumerWrapper(String connectorPluginType) {
-        this.meshMQPushConsumer = PluginFactory.getMeshMQPushConsumer(connectorPluginType);
+        this.meshMQPushConsumer = ConnectorPluginFactory.getMeshMQPushConsumer(connectorPluginType);
         if (meshMQPushConsumer == null) {
             logger.error("can't load the meshMQPushConsumer plugin, please check.");
             throw new RuntimeException("doesn't load the meshMQPushConsumer plugin, please check.");
@@ -52,24 +52,11 @@ public class MQConsumerWrapper extends MQWrapper {
     }
 
     public synchronized void init(Properties keyValue) throws Exception {
-        meshMQPushConsumer = getMeshMQPushConsumer();
-        if (meshMQPushConsumer == null) {
-            logger.error("can't load the meshMQPushConsumer plugin, please check.");
-            throw new RuntimeException("doesn't load the meshMQPushConsumer plugin, please check.");
-        }
 
         meshMQPushConsumer.init(keyValue);
         inited.compareAndSet(false, true);
     }
 
-    private MeshMQPushConsumer getMeshMQPushConsumer() {
-        ServiceLoader<MeshMQPushConsumer> meshMQPushConsumerServiceLoader = ServiceLoader.load(MeshMQPushConsumer.class);
-        if (meshMQPushConsumerServiceLoader.iterator().hasNext()) {
-            return meshMQPushConsumerServiceLoader.iterator().next();
-        }
-        return null;
-    }
-
     public synchronized void start() throws Exception {
         meshMQPushConsumer.start();
         started.compareAndSet(false, true);
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQProducerWrapper.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQProducerWrapper.java
index 2e55fbf..f979dc4 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQProducerWrapper.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQProducerWrapper.java
@@ -18,12 +18,12 @@
 package org.apache.eventmesh.runtime.core.plugin;
 
 import java.util.Properties;
-import java.util.ServiceLoader;
 
 import io.openmessaging.api.Message;
 import io.openmessaging.api.SendCallback;
 
 import org.apache.eventmesh.api.RRCallback;
+import org.apache.eventmesh.api.factory.ConnectorPluginFactory;
 import org.apache.eventmesh.api.producer.MeshMQProducer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -35,7 +35,7 @@ public class MQProducerWrapper extends MQWrapper {
     protected MeshMQProducer meshMQProducer;
 
     public MQProducerWrapper(String connectorPluginType) {
-        this.meshMQProducer = PluginFactory.getMeshMQProducer(connectorPluginType);
+        this.meshMQProducer = ConnectorPluginFactory.getMeshMQProducer(connectorPluginType);
         if (meshMQProducer == null) {
             logger.error("can't load the meshMQProducer plugin, please check.");
             throw new RuntimeException("doesn't load the meshMQProducer plugin, please check.");
@@ -47,24 +47,10 @@ public class MQProducerWrapper extends MQWrapper {
             return;
         }
 
-        meshMQProducer = getSpiMeshMQProducer();
-        if (meshMQProducer == null) {
-            logger.error("can't load the meshMQProducer plugin, please check.");
-            throw new RuntimeException("doesn't load the meshMQProducer plugin, please check.");
-        }
-
         meshMQProducer.init(keyValue);
         inited.compareAndSet(false, true);
     }
 
-    private MeshMQProducer getSpiMeshMQProducer() {
-        ServiceLoader<MeshMQProducer> meshMQProducerServiceLoader = ServiceLoader.load(MeshMQProducer.class);
-        if (meshMQProducerServiceLoader.iterator().hasNext()) {
-            return meshMQProducerServiceLoader.iterator().next();
-        }
-        return null;
-    }
-
     public synchronized void start() throws Exception {
         if (started.get()) {
             return;
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/HandleMsgContext.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/HandleMsgContext.java
index 2d19174..3ce124c 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/HandleMsgContext.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/HandleMsgContext.java
@@ -28,7 +28,6 @@ import org.apache.commons.lang3.time.DateFormatUtils;
 import org.apache.eventmesh.api.AbstractContext;
 import org.apache.eventmesh.common.Constants;
 import org.apache.eventmesh.common.protocol.SubscriptionItem;
-import org.apache.eventmesh.common.protocol.SubscriptionMode;
 import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
 import org.apache.eventmesh.runtime.constants.EventMeshConstants;
 import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupConf;
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java
index 310ea6d..946cc8f 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java
@@ -50,7 +50,6 @@ import org.apache.eventmesh.runtime.configuration.EventMeshTCPConfiguration;
 import org.apache.eventmesh.runtime.constants.EventMeshConstants;
 import org.apache.eventmesh.runtime.core.plugin.MQConsumerWrapper;
 import org.apache.eventmesh.runtime.core.plugin.MQProducerWrapper;
-import org.apache.eventmesh.runtime.core.plugin.PluginFactory;
 import org.apache.eventmesh.runtime.core.protocol.tcp.client.group.dispatch.DownstreamDispatchStrategy;
 import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;
 import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.push.DownStreamMsgContext;
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/DownStreamMsgContext.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/DownStreamMsgContext.java
index 8e441bd..712e6fa 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/DownStreamMsgContext.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/DownStreamMsgContext.java
@@ -29,7 +29,6 @@ import org.apache.commons.lang3.time.DateFormatUtils;
 import org.apache.eventmesh.api.AbstractContext;
 import org.apache.eventmesh.common.Constants;
 import org.apache.eventmesh.common.protocol.SubscriptionItem;
-import org.apache.eventmesh.common.protocol.SubscriptionMode;
 import org.apache.eventmesh.runtime.constants.EventMeshConstants;
 import org.apache.eventmesh.runtime.core.plugin.MQConsumerWrapper;
 import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;
diff --git a/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshExtensionFactory.java b/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshExtensionFactory.java
index 6aea9db..6388809 100644
--- a/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshExtensionFactory.java
+++ b/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshExtensionFactory.java
@@ -18,11 +18,42 @@
 package org.apache.eventmesh.spi;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.eventmesh.spi.loader.ExtensionClassLoader;
+import org.apache.eventmesh.spi.loader.JarExtensionClassLoader;
+import org.apache.eventmesh.spi.loader.MetaInfExtensionClassLoader;
 import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * The extension fetching factory, all extension plugins should be fetched by this factory.
+ * And all the extension plugins defined in eventmesh should have {@link EventMeshSPI} annotation.
+ */
 public enum EventMeshExtensionFactory {
     ;
 
+    private static final Logger logger = LoggerFactory.getLogger(EventMeshExtensionFactory.class);
+
+    private static final List<ExtensionClassLoader> extensionClassLoaders = new ArrayList<>();
+
+    static {
+        extensionClassLoaders.add(new MetaInfExtensionClassLoader());
+        extensionClassLoaders.add(new JarExtensionClassLoader());
+    }
+
+    private static final ConcurrentHashMap<String, Object> EXTENSION_INSTANCE_CACHE =
+            new ConcurrentHashMap<>(16);
+
+    /**
+     * @param extensionType extension plugin class type
+     * @param extensionName extension instance name
+     * @param <T>           the type of the plugin
+     * @return plugin instance
+     */
     public static <T> T getExtension(Class<T> extensionType, String extensionName) {
         if (extensionType == null) {
             throw new ExtensionException("extensionType is null");
@@ -33,6 +64,56 @@ public enum EventMeshExtensionFactory {
         if (!extensionType.isInterface() || !extensionType.isAnnotationPresent(EventMeshSPI.class)) {
             throw new ExtensionException(String.format("extensionType:%s is invalided", extensionType));
         }
-        return EventMeshExtensionLoader.getExtension(extensionType, extensionName);
+        EventMeshSPI eventMeshSPIAnnotation = extensionType.getAnnotation(EventMeshSPI.class);
+        if (eventMeshSPIAnnotation.isSingleton()) {
+            return getSingletonExtension(extensionType, extensionName);
+        }
+        return getPrototypeExtension(extensionType, extensionName);
     }
+
+    @SuppressWarnings("unchecked")
+    private static <T> T getSingletonExtension(Class<T> extensionType, String extensionName) {
+        return (T) EXTENSION_INSTANCE_CACHE.computeIfAbsent(extensionName, name -> {
+            Class<T> extensionInstanceClass = getExtensionClass(extensionType, extensionName);
+            try {
+                if (extensionInstanceClass == null) {
+                    return null;
+                }
+                T extensionInstance = extensionInstanceClass.newInstance();
+                logger.info("initialize extension instance success, extensionType: {}, extensionName: {}",
+                        extensionType, extensionName);
+                return extensionInstance;
+            } catch (InstantiationException | IllegalAccessException e) {
+                throw new ExtensionException("Extension initialize error", e);
+            }
+        });
+    }
+
+    private static <T> T getPrototypeExtension(Class<T> extensionType, String extensionName) {
+        Class<T> extensionInstanceClass = getExtensionClass(extensionType, extensionName);
+        try {
+            if (extensionInstanceClass == null) {
+                return null;
+            }
+            T extensionInstance = extensionInstanceClass.newInstance();
+            logger.info("initialize extension instance success, extensionType: {}, extensionName: {}",
+                    extensionType, extensionName);
+            return extensionInstance;
+        } catch (InstantiationException | IllegalAccessException e) {
+            throw new ExtensionException("Extension initialize error", e);
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private static <T> Class<T> getExtensionClass(Class<T> extensionType, String extensionName) {
+        for (ExtensionClassLoader extensionClassLoader : extensionClassLoaders) {
+            Map<String, Class<?>> extensionClassMap = extensionClassLoader.loadExtensionClass(extensionType);
+            Class<?> instanceClass = extensionClassMap.get(extensionName);
+            if (instanceClass != null) {
+                return (Class<T>) instanceClass;
+            }
+        }
+        return null;
+    }
+
 }
diff --git a/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshExtensionLoader.java b/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshExtensionLoader.java
deleted file mode 100644
index 9531bc9..0000000
--- a/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshExtensionLoader.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eventmesh.spi;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URL;
-import java.util.Enumeration;
-import java.util.Properties;
-import java.util.concurrent.ConcurrentHashMap;
-
-public enum EventMeshExtensionLoader {
-    ;
-
-    private static final Logger logger = LoggerFactory.getLogger(EventMeshExtensionLoader.class);
-
-    private static final ConcurrentHashMap<Class<?>, ConcurrentHashMap<String, Class<?>>> EXTENSION_CLASS_LOAD_CACHE = new ConcurrentHashMap<>(16);
-
-    private static final ConcurrentHashMap<String, Object> EXTENSION_INSTANCE_CACHE = new ConcurrentHashMap<>(16);
-
-    private static final String EVENTMESH_EXTENSION_DIR = "META-INF/eventmesh/";
-
-    @SuppressWarnings("unchecked")
-    public static <T> T getExtension(Class<T> extensionType, String extensionName) {
-        if (!hasLoadExtensionClass(extensionType)) {
-            loadExtensionClass(extensionType);
-        }
-        if (!hasInitializeExtension(extensionName)) {
-            T instance = initializeExtension(extensionType, extensionName);
-            EventMeshSPI spiAnnotation = extensionType.getAnnotation(EventMeshSPI.class);
-            if (!spiAnnotation.isSingleton()) {
-                return instance;
-            }
-            EXTENSION_INSTANCE_CACHE.put(extensionName, instance);
-        }
-        return (T) EXTENSION_INSTANCE_CACHE.get(extensionName);
-    }
-
-    @SuppressWarnings("unchecked")
-    private static <T> T initializeExtension(Class<T> extensionType, String extensionName) {
-        ConcurrentHashMap<String, Class<?>> extensionClassMap = EXTENSION_CLASS_LOAD_CACHE.get(extensionType);
-        if (extensionClassMap == null) {
-            throw new ExtensionException(String.format("Extension type:%s has not been loaded", extensionType));
-        }
-        if (!extensionClassMap.containsKey(extensionName)) {
-            throw new ExtensionException(String.format("Extension name:%s has not been loaded", extensionName));
-        }
-        Class<?> aClass = extensionClassMap.get(extensionName);
-        try {
-            Object extensionObj = aClass.newInstance();
-            logger.info("initialize extension instance success, extensionType: {}, extensionName: {}", extensionType, extensionName);
-            return (T) extensionObj;
-        } catch (InstantiationException | IllegalAccessException e) {
-            throw new ExtensionException("Extension initialize error", e);
-        }
-    }
-
-    public static <T> void loadExtensionClass(Class<T> extensionType) {
-        String extensionFileName = EVENTMESH_EXTENSION_DIR + extensionType.getName();
-        ClassLoader classLoader = EventMeshExtensionLoader.class.getClassLoader();
-        try {
-            Enumeration<URL> extensionUrls = classLoader.getResources(extensionFileName);
-            if (extensionUrls != null) {
-                while (extensionUrls.hasMoreElements()) {
-                    URL url = extensionUrls.nextElement();
-                    loadResources(url, extensionType);
-                }
-            }
-        } catch (IOException e) {
-            throw new ExtensionException("load extension class error", e);
-        }
-
-
-    }
-
-    private static <T> void loadResources(URL url, Class<T> extensionType) throws IOException {
-        try (InputStream inputStream = url.openStream()) {
-            Properties properties = new Properties();
-            properties.load(inputStream);
-            properties.forEach((extensionName, extensionClass) -> {
-                String extensionNameStr = (String) extensionName;
-                String extensionClassStr = (String) extensionClass;
-                try {
-                    Class<?> targetClass = Class.forName(extensionClassStr);
-                    logger.info("load extension class success, extensionType: {}, extensionClass: {}", extensionType, targetClass);
-                    if (!extensionType.isAssignableFrom(targetClass)) {
-                        throw new ExtensionException(
-                                String.format("class: %s is not subClass of %s", targetClass, extensionType));
-                    }
-                    EXTENSION_CLASS_LOAD_CACHE.computeIfAbsent(extensionType, k -> new ConcurrentHashMap<>())
-                            .put(extensionNameStr, targetClass);
-                } catch (ClassNotFoundException e) {
-                    throw new ExtensionException("load extension class error", e);
-                }
-            });
-        }
-    }
-
-    private static <T> boolean hasLoadExtensionClass(Class<T> extensionType) {
-        return EXTENSION_CLASS_LOAD_CACHE.containsKey(extensionType);
-    }
-
-    private static boolean hasInitializeExtension(String extensionName) {
-        return EXTENSION_INSTANCE_CACHE.containsKey(extensionName);
-    }
-}
diff --git a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/domain/RocketMQConstants.java b/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/loader/ExtensionClassLoader.java
similarity index 62%
rename from eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/domain/RocketMQConstants.java
rename to eventmesh-spi/src/main/java/org/apache/eventmesh/spi/loader/ExtensionClassLoader.java
index 063f0f3..feca359 100644
--- a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/domain/RocketMQConstants.java
+++ b/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/loader/ExtensionClassLoader.java
@@ -15,13 +15,25 @@
  * limitations under the License.
  */
 
-package org.apache.eventmesh.connector.rocketmq.domain;
+package org.apache.eventmesh.spi.loader;
 
-public interface RocketMQConstants {
+import java.util.Map;
+
+/**
+ * Load extension class
+ * <ul>
+ *     <li>{@link MetaInfExtensionClassLoader}</li>
+ *     <li>{@link JarExtensionClassLoader}</li>
+ * </ul>
+ */
+public interface ExtensionClassLoader {
 
     /**
-     * Key of scheduled message delivery time
+     * load
+     *
+     * @param extensionType extension type class
+     * @param <T>           extension type
+     * @return extension instance name to extension instance class
      */
-    String START_DELIVER_TIME = "__STARTDELIVERTIME";
-
+    <T> Map<String, Class<?>> loadExtensionClass(Class<T> extensionType);
 }
diff --git a/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/loader/JarExtensionClassLoader.java b/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/loader/JarExtensionClassLoader.java
new file mode 100644
index 0000000..7d591a8
--- /dev/null
+++ b/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/loader/JarExtensionClassLoader.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.spi.loader;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.eventmesh.spi.ExtensionException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Load extension from '${eventMeshPluginDir}', the default loading directory is './dist/plugin'
+ */
+public class JarExtensionClassLoader implements ExtensionClassLoader {
+
+    private static final Logger logger = LoggerFactory.getLogger(JarExtensionClassLoader.class);
+
+    private static final ConcurrentHashMap<Class<?>, Map<String, Class<?>>> EXTENSION_CLASS_CACHE =
+            new ConcurrentHashMap<>(16);
+
+    private static final String EVENTMESH_EXTENSION_PLUGIN_DIR = System.getProperty("eventMeshPluginDir",
+            "./dist/plugin");
+
+    private static final String EVENTMESH_EXTENSION_META_DIR = "META-INF/eventmesh/";
+
+    @Override
+    public <T> Map<String, Class<?>> loadExtensionClass(Class<T> extensionType) {
+        return EXTENSION_CLASS_CACHE.computeIfAbsent(extensionType, this::doLoadExtensionClass);
+    }
+
+    private <T> Map<String, Class<?>> doLoadExtensionClass(Class<T> extensionType) {
+        Map<String, Class<?>> extensionMap = new HashMap<>();
+
+        List<URL> pluginJarPaths = loadJarPathFromResource(EVENTMESH_EXTENSION_PLUGIN_DIR);
+        if (CollectionUtils.isEmpty(pluginJarPaths)) {
+            return extensionMap;
+        }
+
+        String extensionFileName = EVENTMESH_EXTENSION_META_DIR + extensionType.getName();
+        URLClassLoader urlClassLoader = URLClassLoader.newInstance(pluginJarPaths.toArray(new URL[0]));
+        try {
+            Enumeration<URL> extensionUrls = urlClassLoader.getResources(extensionFileName);
+            if (extensionUrls != null) {
+                while (extensionUrls.hasMoreElements()) {
+                    URL url = extensionUrls.nextElement();
+                    extensionMap.putAll(loadResources(urlClassLoader, url, extensionType));
+                }
+            }
+        } catch (IOException e) {
+            throw new ExtensionException("load extension class error", e);
+        }
+        return extensionMap;
+    }
+
+    private List<URL> loadJarPathFromResource(String pluginPath) {
+        File plugin = new File(pluginPath);
+        if (!plugin.exists()) {
+            logger.warn("plugin dir:{} is not exist", pluginPath);
+            return Lists.newArrayList();
+        }
+        if (plugin.isFile() && plugin.getName().endsWith(".jar")) {
+            try {
+                return Lists.newArrayList(plugin.toURI().toURL());
+            } catch (Exception e) {
+                throw new ExtensionException(e);
+            }
+        }
+        File[] files = plugin.listFiles();
+        List<URL> pluginUrls = new ArrayList<>();
+        if (files != null) {
+            for (File file : files) {
+                pluginUrls.addAll(loadJarPathFromResource(file.getPath()));
+            }
+        }
+        return pluginUrls;
+    }
+
+    private static <T> Map<String, Class<?>> loadResources(URLClassLoader urlClassLoader, URL url, Class<T> extensionType) throws IOException {
+        Map<String, Class<?>> extensionMap = new HashMap<>();
+        try (InputStream inputStream = url.openStream()) {
+            Properties properties = new Properties();
+            properties.load(inputStream);
+            properties.forEach((extensionName, extensionClass) -> {
+                String extensionNameStr = (String) extensionName;
+                String extensionClassStr = (String) extensionClass;
+                try {
+                    Class<?> targetClass = urlClassLoader.loadClass(extensionClassStr);
+                    logger.info("load extension class success, extensionType: {}, extensionClass: {}",
+                            extensionType, targetClass);
+                    if (!extensionType.isAssignableFrom(targetClass)) {
+                        throw new ExtensionException(
+                                String.format("class: %s is not subClass of %s", targetClass, extensionType));
+                    }
+                    extensionMap.put(extensionNameStr, targetClass);
+                } catch (ClassNotFoundException e) {
+                    throw new ExtensionException("load extension class error", e);
+                }
+            });
+        }
+        return extensionMap;
+    }
+}
diff --git a/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/loader/MetaInfExtensionClassLoader.java b/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/loader/MetaInfExtensionClassLoader.java
new file mode 100644
index 0000000..59d45e1
--- /dev/null
+++ b/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/loader/MetaInfExtensionClassLoader.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.spi.loader;
+
+import org.apache.eventmesh.spi.ExtensionException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Load extension from classpath
+ */
+public class MetaInfExtensionClassLoader implements ExtensionClassLoader {
+
+    private static final Logger logger = LoggerFactory.getLogger(MetaInfExtensionClassLoader.class);
+
+    private static final ConcurrentHashMap<Class<?>, Map<String, Class<?>>> EXTENSION_CLASS_CACHE =
+            new ConcurrentHashMap<>(16);
+
+    private static final String EVENTMESH_EXTENSION_META_DIR = "META-INF/eventmesh/";
+
+    @Override
+    public <T> Map<String, Class<?>> loadExtensionClass(Class<T> extensionType) {
+        return EXTENSION_CLASS_CACHE.computeIfAbsent(extensionType, this::doLoadExtensionClass);
+    }
+
+    private <T> Map<String, Class<?>> doLoadExtensionClass(Class<T> extensionType) {
+        Map<String, Class<?>> extensionMap = new HashMap<>();
+        String extensionFileName = EVENTMESH_EXTENSION_META_DIR + extensionType.getName();
+        ClassLoader classLoader = MetaInfExtensionClassLoader.class.getClassLoader();
+        try {
+            Enumeration<URL> extensionUrls = classLoader.getResources(extensionFileName);
+            if (extensionUrls != null) {
+                while (extensionUrls.hasMoreElements()) {
+                    URL url = extensionUrls.nextElement();
+                    extensionMap.putAll(loadResources(url, extensionType));
+                }
+            }
+        } catch (IOException e) {
+            throw new ExtensionException("load extension class error", e);
+        }
+        return extensionMap;
+    }
+
+    private static <T> Map<String, Class<?>> loadResources(URL url, Class<T> extensionType) throws IOException {
+        Map<String, Class<?>> extensionMap = new HashMap<>();
+        try (InputStream inputStream = url.openStream()) {
+            Properties properties = new Properties();
+            properties.load(inputStream);
+            properties.forEach((extensionName, extensionClass) -> {
+                String extensionNameStr = (String) extensionName;
+                String extensionClassStr = (String) extensionClass;
+                try {
+                    Class<?> targetClass = Class.forName(extensionClassStr);
+                    logger.info("load extension class success, extensionType: {}, extensionClass: {}",
+                            extensionType, targetClass);
+                    if (!extensionType.isAssignableFrom(targetClass)) {
+                        throw new ExtensionException(
+                                String.format("class: %s is not subClass of %s", targetClass, extensionType));
+                    }
+                    extensionMap.put(extensionNameStr, targetClass);
+                } catch (ClassNotFoundException e) {
+                    throw new ExtensionException("load extension class error", e);
+                }
+            });
+        }
+        return extensionMap;
+    }
+}
diff --git a/eventmesh-starter/build.gradle b/eventmesh-starter/build.gradle
index 7b81c6b..48aafcb 100644
--- a/eventmesh-starter/build.gradle
+++ b/eventmesh-starter/build.gradle
@@ -16,7 +16,7 @@
  */
 
 dependencies {
-    implementation project(":eventmesh-runtime"), project(":eventmesh-connector-rocketmq")
-    testImplementation project(":eventmesh-runtime"), project(":eventmesh-connector-rocketmq")
+    implementation project(":eventmesh-runtime")
+    testImplementation project(":eventmesh-runtime")
     //testImplementation group: 'junit', name: 'junit', version: '4.12'
 }
\ No newline at end of file
diff --git a/eventmesh-test/build.gradle b/eventmesh-test/build.gradle
index 567374f..8844c4c 100644
--- a/eventmesh-test/build.gradle
+++ b/eventmesh-test/build.gradle
@@ -25,8 +25,12 @@ List spring_framework = [
 dependencies {
 //    compile log4j2, sl4j
 //    testCompile log4j2, sl4j
-    implementation project(":eventmesh-sdk-java"), project(":eventmesh-connector-api"), project(":eventmesh-common"), spring_framework
-    testImplementation project(":eventmesh-sdk-java"), project(":eventmesh-connector-api"), project(":eventmesh-common"), spring_framework
+    implementation project(":eventmesh-sdk-java")
+    implementation project(":eventmesh-connector-plugin:eventmesh-connector-api")
+    implementation spring_framework
+    testImplementation project(":eventmesh-sdk-java")
+    testImplementation project(":eventmesh-connector-plugin:eventmesh-connector-api")
+    testImplementation spring_framework
 }
 
 configurations.all {
diff --git a/install.sh b/install.sh
index 19397ac..7acf43d 100644
--- a/install.sh
+++ b/install.sh
@@ -26,7 +26,7 @@
 # jar        : produce jar
 
 # package tar.gz/zip
-gradle clean -Pdev=true -Pjdk=1.7 dist tar zip
+gradle clean -Pdev=true -Pjdk=1.8 dist tar zip
 
 # package jar
-gradle clean -Pdev=true -Pjdk=1.7 jar
\ No newline at end of file
+gradle clean -Pdev=true -Pjdk=1.8 jar
\ No newline at end of file
diff --git a/settings.gradle b/settings.gradle
index 2b5e0af..7c0e70a 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -18,11 +18,11 @@
 rootProject.name = 'EventMesh'
 String jdkVersion = "${jdk}"
 include 'eventmesh-runtime'
-include 'eventmesh-connector-rocketmq'
 include 'eventmesh-sdk-java'
 include 'eventmesh-common'
-include 'eventmesh-connector-api'
 include 'eventmesh-starter'
 include 'eventmesh-test'
 include 'eventmesh-spi'
+include 'eventmesh-connector-plugin:eventmesh-connector-api'
+include 'eventmesh-connector-plugin:eventmesh-connector-rocketmq'
 
diff --git a/style/checkStyle-suppressions.xml b/style/checkStyle-suppressions.xml
new file mode 100644
index 0000000..fd4a42f
--- /dev/null
+++ b/style/checkStyle-suppressions.xml
@@ -0,0 +1,25 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  ~
+  -->
+<!DOCTYPE suppressions PUBLIC
+        "-//Puppy Crawl//DTD Suppressions 1.0//EN"
+        "http://www.puppycrawl.com/dtds/suppressions_1_0.dtd">
+
+<suppressions>
+
+</suppressions>
\ No newline at end of file
diff --git a/style/checkStyle.xml b/style/checkStyle.xml
index d8ff7bd..dc98c99 100644
--- a/style/checkStyle.xml
+++ b/style/checkStyle.xml
@@ -21,6 +21,12 @@
   "-//Puppy Crawl//DTD Check Configuration 1.3//EN"
   "http://checkstyle.org/dtds/configuration_1_3.dtd">
 <module name="Checker">
+
+  <module name="SuppressionFilter">
+    <property name="file" value="./style/checkStyle-suppressions.xml"/>
+    <property name="optional" value="false"/>
+  </module>
+
   <module name="RegexpSingleline">
     <property name="format" value="System\..+\.println"/>
     <property name="message" value="Prohibit invoking System.*.println in source code !"/>
@@ -44,4 +50,9 @@
       <property name="format" value="^(org)\.apache(\.[a-zA-Z][a-zA-Z0-9]*)+$"/>
     </module>
   </module>
+
+  <module name="BeforeExecutionExclusionFileFilter">
+    <property name="fileNamePattern" value="./eventmesh-runtime/conf/sChat2.jks$"/>
+  </module>
+
 </module>

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org