You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by ch...@apache.org on 2021/07/09 06:16:05 UTC

[incubator-eventmesh] branch 1.3.0 updated: [ISSUE #418]Refactor the plugin load code (#421)

This is an automated email from the ASF dual-hosted git repository.

chenguangsheng pushed a commit to branch 1.3.0
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git


The following commit(s) were added to refs/heads/1.3.0 by this push:
     new 8ecbdcf  [ISSUE #418]Refactor the plugin load code (#421)
8ecbdcf is described below

commit 8ecbdcf1e5a176a81a24fde5faba6a08726f76f5
Author: Wenjun Ruan <86...@qq.com>
AuthorDate: Fri Jul 9 14:15:59 2021 +0800

    [ISSUE #418]Refactor the plugin load code (#421)
    
    * [ISSUE #418]Refactor the plugin load code
    
    * fix ut
---
 .../eventmesh-runtime-quickstart.zh-CN.md          |  16 ++--
 .../instructions/eventmesh-runtime-quickstart.md   |  16 ++--
 docs/images/project-structure.png                  | Bin 63401 -> 54777 bytes
 .../common/config/CommonConfiguration.java         | 105 ++-------------------
 .../src/test/resources/configuration.properties    |   1 +
 eventmesh-connector-api/build.gradle               |   4 +-
 eventmesh-connector-api/gradle.properties          |   2 +-
 .../eventmesh/api/consumer/MeshMQPushConsumer.java |   2 +
 .../eventmesh/api/producer/MeshMQProducer.java     |   2 +
 ...pache.eventmesh.api.consumer.MeshMQPushConsumer |   2 +-
 ...rg.apache.eventmesh.api.producer.MeshMQProducer |   2 +-
 eventmesh-runtime/build.gradle                     |   4 +-
 eventmesh-runtime/conf/eventmesh.properties        |   5 +-
 .../runtime/core/plugin/MQConsumerWrapper.java     |  23 ++---
 .../runtime/core/plugin/MQProducerWrapper.java     |  22 ++---
 .../runtime/core/plugin/PluginFactory.java         |  39 ++++++++
 .../protocol/http/consumer/EventMeshConsumer.java  |   6 +-
 .../protocol/http/producer/EventMeshProducer.java  |   4 +-
 .../tcp/client/group/ClientGroupWrapper.java       |  12 ++-
 .../eventmesh/spi/EventMeshExtensionFactory.java   |   1 +
 .../eventmesh/spi/EventMeshExtensionLoader.java    |  10 +-
 .../eventmesh/http/demo/SyncRequestInstance.java   |  11 ++-
 22 files changed, 126 insertions(+), 163 deletions(-)

diff --git a/docs/cn/instructions/eventmesh-runtime-quickstart.zh-CN.md b/docs/cn/instructions/eventmesh-runtime-quickstart.zh-CN.md
index adc9703..ecafc29 100644
--- a/docs/cn/instructions/eventmesh-runtime-quickstart.zh-CN.md
+++ b/docs/cn/instructions/eventmesh-runtime-quickstart.zh-CN.md
@@ -62,8 +62,9 @@ sh start.sh
 - eventmesh-runtime : eventmesh运行时模块
 - eventmesh-sdk-java : eventmesh java客户端sdk
 - eventmesh-starter : eventmesh本地启动运行项目入口
+- eventmesh-spi : eventmesh SPI加载模块
 
-> 注:插件模块遵循java spi机制,需要在对应模块中的/main/resources/META-INF/services 下配置相关接口与实现类的映射文件
+> 注:插件模块遵循eventmesh定义的spi机制,需要在对应模块中的/main/resources/META-INF/eventmesh 下配置相关接口与实现类的映射文件
 
 **2.3.2 配置VM启动参数**
 
@@ -75,18 +76,17 @@ sh start.sh
 ```
 > 注:如果操作系统为Windows, 可能需要将文件分隔符换成\
 
-**2.3.3 配置build.gradle文件**
+**2.3.3 配置插件**
 
-通过修改dependencies,compile project 项来指定项目启动后加载的插件
+在`eventMesh.properties`配置文件通过声明式的方式来指定项目启动后需要加载的插件
 
-修改`eventmesh-starter`模块下面的`build.gradle`文件
+修改`confPath`目录下面的`eventMesh.properties`文件
 
-加载**RocketMQ**插件配置:
+加载**RocketMQ Connector**插件配置:
 
 ```java
-dependencies {
-    compile project(":eventmesh-runtime"), project(":eventmesh-connector-rocketmq")
-}
+#connector plugin 
+eventMesh.connector.plugin.type=rocketmq
 ```
 
 **2.3.4 启动运行**
diff --git a/docs/en/instructions/eventmesh-runtime-quickstart.md b/docs/en/instructions/eventmesh-runtime-quickstart.md
index 0536e5b..dd89795 100644
--- a/docs/en/instructions/eventmesh-runtime-quickstart.md
+++ b/docs/en/instructions/eventmesh-runtime-quickstart.md
@@ -62,9 +62,10 @@ Same with 1.2
 - eventmesh-runtime : eventmesh runtime module
 - eventmesh-sdk-java : eventmesh java client sdk
 - eventmesh-starter : eventmesh project local start entry
+- eventmesh-spi : eventmesh SPI load module
 
-> ps: The loading of connector plugin follows the Java SPI mechanism, it's necessary to configure the mapping file of
-related interface and implementation class under /main/resources/meta-inf/services in the corresponding module
+> ps: The loading of connector plugin follows the eventmesh SPI mechanism, it's necessary to configure the mapping file of
+related interface and implementation class under /main/resources/meta-inf/eventmesh in the corresponding module
 
 **2.3.2 Configure VM Options**
 
@@ -76,18 +77,17 @@ related interface and implementation class under /main/resources/meta-inf/servic
 ```
 > ps: If you use Windows, you may need to replace the file separator to \
 
-**2.3.3 Configure build.gradle file**
+**2.3.3 Configure plugin**
 
-Specify the connector that will be loaded after the project start with updating compile project item in dependencies
+Specify the connector plugin that will be loaded after the project start by declaring in `eventMesh.properties`
 
-update `build.gradle` file under the `eventmesh-starter` module
+Modify the `eventMesh.properties` file in the `confPath` directory
 
 load **rocketmq connector** configuration:
 
 ```java
-dependencies {
-    compile project(":eventmesh-runtime"), project(":eventmesh-connector-rocketmq")
-}
+#connector plugin
+eventMesh.connector.plugin.type=rocketmq
 ```
 
 **2.3.4 Run**
diff --git a/docs/images/project-structure.png b/docs/images/project-structure.png
index 252a953..efc5249 100644
Binary files a/docs/images/project-structure.png and b/docs/images/project-structure.png differ
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java
index 08a44cb..1ae6b26 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java
@@ -17,17 +17,10 @@
 
 package org.apache.eventmesh.common.config;
 
-import java.net.Inet6Address;
-import java.net.InetAddress;
-import java.net.NetworkInterface;
-import java.net.SocketException;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.Enumeration;
-
 import com.google.common.base.Preconditions;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.eventmesh.common.IPUtil;
 
 public class CommonConfiguration {
     public String eventMeshEnv = "P";
@@ -35,7 +28,7 @@ public class CommonConfiguration {
     public String eventMeshCluster = "LS";
     public String eventMeshName = "";
     public String sysID = "5477";
-
+    public String eventMeshConnectorPluginType = "rocketmq";
 
     public String namesrvAddr = "";
     public String clientUserName = "username";
@@ -84,8 +77,11 @@ public class CommonConfiguration {
 
             eventMeshServerIp = configurationWraper.getProp(ConfKeys.KEYS_EVENTMESH_SERVER_HOST_IP);
             if (StringUtils.isBlank(eventMeshServerIp)) {
-                eventMeshServerIp = getLocalAddr();
+                eventMeshServerIp = IPUtil.getLocalAddress();
             }
+
+            eventMeshConnectorPluginType = configurationWraper.getProp(ConfKeys.KEYS_ENENTMESH_CONNECTOR_PLUGIN_TYPE);
+            Preconditions.checkState(StringUtils.isNotEmpty(eventMeshConnectorPluginType), String.format("%s error", ConfKeys.KEYS_ENENTMESH_CONNECTOR_PLUGIN_TYPE));
         }
     }
 
@@ -105,94 +101,7 @@ public class CommonConfiguration {
         public static String KEYS_EVENTMESH_SERVER_REGISTER_INTERVAL = "eventMesh.server.registry.registerIntervalInMills";
 
         public static String KEYS_EVENTMESH_SERVER_FETCH_REGISTRY_ADDR_INTERVAL = "eventMesh.server.registry.fetchRegistryAddrIntervalInMills";
-    }
-
-    public static String getLocalAddr() {
-        //priority of networkInterface when generating client ip
-        String priority = System.getProperty("networkInterface.priority", "bond1<eth1<eth0");
-        ArrayList<String> preferList = new ArrayList<String>();
-        for (String eth : priority.split("<")) {
-            preferList.add(eth);
-        }
-        NetworkInterface preferNetworkInterface = null;
-
-        try {
-            Enumeration<NetworkInterface> enumeration1 = NetworkInterface.getNetworkInterfaces();
-            while (enumeration1.hasMoreElements()) {
-                final NetworkInterface networkInterface = enumeration1.nextElement();
-                if (!preferList.contains(networkInterface.getName())) {
-                    continue;
-                } else if (preferNetworkInterface == null) {
-                    preferNetworkInterface = networkInterface;
-                }
-                //get the networkInterface that has higher priority
-                else if (preferList.indexOf(networkInterface.getName())
-                        > preferList.indexOf(preferNetworkInterface.getName())) {
-                    preferNetworkInterface = networkInterface;
-                }
-            }
-
-            // Traversal Network interface to get the first non-loopback and non-private address
-            ArrayList<String> ipv4Result = new ArrayList<String>();
-            ArrayList<String> ipv6Result = new ArrayList<String>();
-
-            if (preferNetworkInterface != null) {
-                final Enumeration<InetAddress> en = preferNetworkInterface.getInetAddresses();
-                getIpResult(ipv4Result, ipv6Result, en);
-            } else {
-                Enumeration<NetworkInterface> enumeration = NetworkInterface.getNetworkInterfaces();
-                while (enumeration.hasMoreElements()) {
-                    final NetworkInterface networkInterface = enumeration.nextElement();
-                    final Enumeration<InetAddress> en = networkInterface.getInetAddresses();
-                    getIpResult(ipv4Result, ipv6Result, en);
-                }
-            }
-
-            // prefer ipv4
-            if (!ipv4Result.isEmpty()) {
-                for (String ip : ipv4Result) {
-                    if (ip.startsWith("127.0") || ip.startsWith("192.168")) {
-                        continue;
-                    }
-
-                    return ip;
-                }
 
-                return ipv4Result.get(ipv4Result.size() - 1);
-            } else if (!ipv6Result.isEmpty()) {
-                return ipv6Result.get(0);
-            }
-            //If failed to find,fall back to localhost
-            final InetAddress localHost = InetAddress.getLocalHost();
-            return normalizeHostAddress(localHost);
-        } catch (SocketException e) {
-            e.printStackTrace();
-        } catch (UnknownHostException e) {
-            e.printStackTrace();
-        }
-
-        return null;
-    }
-
-    public static String normalizeHostAddress(final InetAddress localHost) {
-        if (localHost instanceof Inet6Address) {
-            return "[" + localHost.getHostAddress() + "]";
-        } else {
-            return localHost.getHostAddress();
-        }
-    }
-
-    private static void getIpResult(ArrayList<String> ipv4Result, ArrayList<String> ipv6Result,
-                                    Enumeration<InetAddress> en) {
-        while (en.hasMoreElements()) {
-            final InetAddress address = en.nextElement();
-            if (!address.isLoopbackAddress()) {
-                if (address instanceof Inet6Address) {
-                    ipv6Result.add(normalizeHostAddress(address));
-                } else {
-                    ipv4Result.add(normalizeHostAddress(address));
-                }
-            }
-        }
+        public static String KEYS_ENENTMESH_CONNECTOR_PLUGIN_TYPE = "eventMesh.connector.plugin.type";
     }
 }
\ No newline at end of file
diff --git a/eventmesh-common/src/test/resources/configuration.properties b/eventmesh-common/src/test/resources/configuration.properties
index d7c5096..76f29f2 100644
--- a/eventmesh-common/src/test/resources/configuration.properties
+++ b/eventmesh-common/src/test/resources/configuration.properties
@@ -21,3 +21,4 @@ eventMesh.sysid=3
 eventMesh.server.cluster=value4
 eventMesh.server.name=value5
 eventMesh.server.hostIp=value6
+eventMesh.connector.plugin.type=rocketmq
diff --git a/eventmesh-connector-api/build.gradle b/eventmesh-connector-api/build.gradle
index 2d1205d..157048e 100644
--- a/eventmesh-connector-api/build.gradle
+++ b/eventmesh-connector-api/build.gradle
@@ -20,6 +20,6 @@ List open_message = [
 ]
 
 dependencies {
-    implementation open_message,project(":eventmesh-common")
-    testImplementation open_message,project(":eventmesh-common")
+    implementation open_message,project(":eventmesh-common"), project(":eventmesh-spi")
+    testImplementation open_message,project(":eventmesh-common"), project(":eventmesh-spi")
 }
diff --git a/eventmesh-connector-api/gradle.properties b/eventmesh-connector-api/gradle.properties
index ae30087..9d1744e 100644
--- a/eventmesh-connector-api/gradle.properties
+++ b/eventmesh-connector-api/gradle.properties
@@ -16,6 +16,6 @@
 #
 group=org.apache.eventmesh
 version=1.2.0-SNAPSHOT
-jdk=1.7
+jdk=1.8
 mavenUserName=
 mavenPassword=
\ No newline at end of file
diff --git a/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/MeshMQPushConsumer.java b/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/MeshMQPushConsumer.java
index 5e60e0e..4ac1edb 100644
--- a/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/MeshMQPushConsumer.java
+++ b/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/MeshMQPushConsumer.java
@@ -25,7 +25,9 @@ import io.openmessaging.api.Consumer;
 import io.openmessaging.api.Message;
 
 import org.apache.eventmesh.api.AbstractContext;
+import org.apache.eventmesh.spi.EventMeshSPI;
 
+@EventMeshSPI
 public interface MeshMQPushConsumer extends Consumer {
 
     void init(Properties keyValue) throws Exception;
diff --git a/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/MeshMQProducer.java b/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/MeshMQProducer.java
index 82ca583..c717385 100644
--- a/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/MeshMQProducer.java
+++ b/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/MeshMQProducer.java
@@ -24,7 +24,9 @@ import io.openmessaging.api.Producer;
 import io.openmessaging.api.SendCallback;
 
 import org.apache.eventmesh.api.RRCallback;
+import org.apache.eventmesh.spi.EventMeshSPI;
 
+@EventMeshSPI
 public interface MeshMQProducer extends Producer {
 
     void init(Properties properties) throws Exception;
diff --git a/eventmesh-connector-rocketmq/src/main/resources/META-INF/services/org.apache.eventmesh.api.consumer.MeshMQPushConsumer b/eventmesh-connector-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.consumer.MeshMQPushConsumer
similarity index 90%
rename from eventmesh-connector-rocketmq/src/main/resources/META-INF/services/org.apache.eventmesh.api.consumer.MeshMQPushConsumer
rename to eventmesh-connector-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.consumer.MeshMQPushConsumer
index c98880a..0df2e28 100644
--- a/eventmesh-connector-rocketmq/src/main/resources/META-INF/services/org.apache.eventmesh.api.consumer.MeshMQPushConsumer
+++ b/eventmesh-connector-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.consumer.MeshMQPushConsumer
@@ -13,4 +13,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-org.apache.eventmesh.connector.rocketmq.consumer.RocketMQConsumerImpl
\ No newline at end of file
+rocketmq=org.apache.eventmesh.connector.rocketmq.consumer.RocketMQConsumerImpl
\ No newline at end of file
diff --git a/eventmesh-connector-rocketmq/src/main/resources/META-INF/services/org.apache.eventmesh.api.producer.MeshMQProducer b/eventmesh-connector-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.producer.MeshMQProducer
similarity index 90%
rename from eventmesh-connector-rocketmq/src/main/resources/META-INF/services/org.apache.eventmesh.api.producer.MeshMQProducer
rename to eventmesh-connector-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.producer.MeshMQProducer
index 28907ca..ef4959d 100644
--- a/eventmesh-connector-rocketmq/src/main/resources/META-INF/services/org.apache.eventmesh.api.producer.MeshMQProducer
+++ b/eventmesh-connector-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.producer.MeshMQProducer
@@ -13,4 +13,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-org.apache.eventmesh.connector.rocketmq.producer.RocketMQProducerImpl
\ No newline at end of file
+rocketmq=org.apache.eventmesh.connector.rocketmq.producer.RocketMQProducerImpl
\ No newline at end of file
diff --git a/eventmesh-runtime/build.gradle b/eventmesh-runtime/build.gradle
index 09dd79f..e5bb065 100644
--- a/eventmesh-runtime/build.gradle
+++ b/eventmesh-runtime/build.gradle
@@ -31,6 +31,6 @@ List open_message = [
 
 
 dependencies {
-    implementation  metrics, open_message,project(":eventmesh-connector-api"),project(":eventmesh-common")
-    testImplementation metrics,open_message,project(":eventmesh-common"),project(":eventmesh-connector-api")
+    implementation  metrics, open_message,project(":eventmesh-connector-api"),project(":eventmesh-common"),project(":eventmesh-spi")
+    testImplementation metrics,open_message,project(":eventmesh-common"),project(":eventmesh-connector-api"),project(":eventmesh-spi")
 }
diff --git a/eventmesh-runtime/conf/eventmesh.properties b/eventmesh-runtime/conf/eventmesh.properties
index 035b950..45fc193 100644
--- a/eventmesh-runtime/conf/eventmesh.properties
+++ b/eventmesh-runtime/conf/eventmesh.properties
@@ -51,4 +51,7 @@ eventMesh.server.admin.http.port=10106
 eventMesh.server.registry.registerIntervalInMills=10000
 eventMesh.server.registry.fetchRegistryAddrIntervalInMills=20000
 #auto-ack
-#eventMesh.server.defibus.client.comsumeTimeoutInMin=5
\ No newline at end of file
+#eventMesh.server.defibus.client.comsumeTimeoutInMin=5
+
+#connector plugin
+eventMesh.connector.plugin.type=rocketmq
\ No newline at end of file
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQConsumerWrapper.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQConsumerWrapper.java
index 080b7af..b629854 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQConsumerWrapper.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQConsumerWrapper.java
@@ -19,7 +19,6 @@ package org.apache.eventmesh.runtime.core.plugin;
 
 import java.util.List;
 import java.util.Properties;
-import java.util.ServiceLoader;
 
 import io.openmessaging.api.AsyncMessageListener;
 import io.openmessaging.api.Message;
@@ -35,6 +34,14 @@ public class MQConsumerWrapper extends MQWrapper {
 
     protected MeshMQPushConsumer meshMQPushConsumer;
 
+    public MQConsumerWrapper(String connectorPluginType) {
+        this.meshMQPushConsumer = PluginFactory.getMeshMQPushConsumer(connectorPluginType);
+        if (meshMQPushConsumer == null) {
+            logger.error("can't load the meshMQPushConsumer plugin, please check.");
+            throw new RuntimeException("doesn't load the meshMQPushConsumer plugin, please check.");
+        }
+    }
+
     public void subscribe(String topic, AsyncMessageListener listener) throws Exception {
         meshMQPushConsumer.subscribe(topic, listener);
     }
@@ -44,24 +51,10 @@ public class MQConsumerWrapper extends MQWrapper {
     }
 
     public synchronized void init(Properties keyValue) throws Exception {
-        meshMQPushConsumer = getMeshMQPushConsumer();
-        if (meshMQPushConsumer == null) {
-            logger.error("can't load the meshMQPushConsumer plugin, please check.");
-            throw new RuntimeException("doesn't load the meshMQPushConsumer plugin, please check.");
-        }
-
         meshMQPushConsumer.init(keyValue);
         inited.compareAndSet(false, true);
     }
 
-    private MeshMQPushConsumer getMeshMQPushConsumer() {
-        ServiceLoader<MeshMQPushConsumer> meshMQPushConsumerServiceLoader = ServiceLoader.load(MeshMQPushConsumer.class);
-        if (meshMQPushConsumerServiceLoader.iterator().hasNext()) {
-            return meshMQPushConsumerServiceLoader.iterator().next();
-        }
-        return null;
-    }
-
     public synchronized void start() throws Exception {
         meshMQPushConsumer.start();
         started.compareAndSet(false, true);
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQProducerWrapper.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQProducerWrapper.java
index 082ab3b..60fe842 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQProducerWrapper.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQProducerWrapper.java
@@ -34,27 +34,21 @@ public class MQProducerWrapper extends MQWrapper {
 
     protected MeshMQProducer meshMQProducer;
 
-    public synchronized void init(Properties keyValue) throws Exception {
-        if (inited.get()) {
-            return;
-        }
-
-        meshMQProducer = getSpiMeshMQProducer();
+    public MQProducerWrapper(String connectorPluginType) {
+        this.meshMQProducer = PluginFactory.getMeshMQProducer(connectorPluginType);
         if (meshMQProducer == null) {
             logger.error("can't load the meshMQProducer plugin, please check.");
             throw new RuntimeException("doesn't load the meshMQProducer plugin, please check.");
         }
-        meshMQProducer.init(keyValue);
-
-        inited.compareAndSet(false, true);
     }
 
-    private MeshMQProducer getSpiMeshMQProducer() {
-        ServiceLoader<MeshMQProducer> meshMQProducerServiceLoader = ServiceLoader.load(MeshMQProducer.class);
-        if (meshMQProducerServiceLoader.iterator().hasNext()) {
-            return meshMQProducerServiceLoader.iterator().next();
+    public synchronized void init(Properties keyValue) throws Exception {
+        if (inited.get()) {
+            return;
         }
-        return null;
+        meshMQProducer.init(keyValue);
+
+        inited.compareAndSet(false, true);
     }
 
     public synchronized void start() throws Exception {
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/PluginFactory.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/PluginFactory.java
new file mode 100644
index 0000000..b114953
--- /dev/null
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/PluginFactory.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to Apache Software Foundation (ASF) under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Apache Software Foundation (ASF) licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.eventmesh.runtime.core.plugin;
+
+import org.apache.eventmesh.api.consumer.MeshMQPushConsumer;
+import org.apache.eventmesh.api.producer.MeshMQProducer;
+import org.apache.eventmesh.spi.EventMeshExtensionFactory;
+
+public class PluginFactory {
+
+    public static MeshMQProducer getMeshMQProducer(String connectorPluginName) {
+        return EventMeshExtensionFactory.getExtension(MeshMQProducer.class, connectorPluginName);
+    }
+
+    public static MeshMQPushConsumer getMeshMQPushConsumer(String connectorPluginName) {
+        return EventMeshExtensionFactory.getExtension(MeshMQPushConsumer.class, connectorPluginName);
+    }
+
+    private static <T> T getPlugin(Class<T> pluginType, String pluginName) {
+        return EventMeshExtensionFactory.getExtension(pluginType, pluginName);
+    }
+}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java
index 8620e68..ef051b0 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java
@@ -66,13 +66,15 @@ public class EventMeshConsumer {
 
     private ConsumerGroupConf consumerGroupConf;
 
-    private MQConsumerWrapper persistentMqConsumer = new MQConsumerWrapper();
+    private MQConsumerWrapper persistentMqConsumer;
 
-    private MQConsumerWrapper broadcastMqConsumer = new MQConsumerWrapper();
+    private MQConsumerWrapper broadcastMqConsumer;
 
     public EventMeshConsumer(EventMeshHTTPServer eventMeshHTTPServer, ConsumerGroupConf consumerGroupConf) {
         this.eventMeshHTTPServer = eventMeshHTTPServer;
         this.consumerGroupConf = consumerGroupConf;
+        this.persistentMqConsumer = new MQConsumerWrapper(eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshConnectorPluginType);
+        this.broadcastMqConsumer = new MQConsumerWrapper(eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshConnectorPluginType);
     }
 
     private MessageHandler httpMessageHandler;
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/producer/EventMeshProducer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/producer/EventMeshProducer.java
index cf41ca2..fe32180 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/producer/EventMeshProducer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/producer/EventMeshProducer.java
@@ -69,7 +69,7 @@ public class EventMeshProducer {
         return true;
     }
 
-    protected MQProducerWrapper mqProducerWrapper = new MQProducerWrapper();
+    protected MQProducerWrapper mqProducerWrapper;
 
     public MQProducerWrapper getMqProducerWrapper() {
         return mqProducerWrapper;
@@ -85,7 +85,7 @@ public class EventMeshProducer {
 
         //TODO for defibus
         keyValue.put("eventMeshIDC", eventMeshHttpConfiguration.eventMeshIDC);
-
+        mqProducerWrapper = new MQProducerWrapper(eventMeshHttpConfiguration.eventMeshConnectorPluginType);
         mqProducerWrapper.init(keyValue);
         inited.compareAndSet(false, true);
         logger.info("EventMeshProducer [{}] inited.............", producerGroupConfig.getGroupName());
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java
index 5829e49..310ea6d 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java
@@ -50,6 +50,7 @@ import org.apache.eventmesh.runtime.configuration.EventMeshTCPConfiguration;
 import org.apache.eventmesh.runtime.constants.EventMeshConstants;
 import org.apache.eventmesh.runtime.core.plugin.MQConsumerWrapper;
 import org.apache.eventmesh.runtime.core.plugin.MQProducerWrapper;
+import org.apache.eventmesh.runtime.core.plugin.PluginFactory;
 import org.apache.eventmesh.runtime.core.protocol.tcp.client.group.dispatch.DownstreamDispatchStrategy;
 import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;
 import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.push.DownStreamMsgContext;
@@ -96,14 +97,16 @@ public class ClientGroupWrapper {
 
     public AtomicBoolean inited4Broadcast = new AtomicBoolean(Boolean.FALSE);
 
-    private MQConsumerWrapper persistentMsgConsumer = new MQConsumerWrapper();
+    private MQConsumerWrapper persistentMsgConsumer;
 
-    private MQConsumerWrapper broadCastMsgConsumer = new MQConsumerWrapper();
+    private MQConsumerWrapper broadCastMsgConsumer;
 
     private ConcurrentHashMap<String, Set<Session>> topic2sessionInGroupMapping = new ConcurrentHashMap<String, Set<Session>>();
 
     public AtomicBoolean producerStarted = new AtomicBoolean(Boolean.FALSE);
 
+    private MQProducerWrapper mqProducerWrapper;
+
     public ClientGroupWrapper(String sysId, String producerGroup, String consumerGroup,
                               EventMeshTCPServer eventMeshTCPServer,
                               DownstreamDispatchStrategy downstreamDispatchStrategy) {
@@ -115,6 +118,9 @@ public class ClientGroupWrapper {
         this.eventMeshTcpRetryer = eventMeshTCPServer.getEventMeshTcpRetryer();
         this.eventMeshTcpMonitor = eventMeshTCPServer.getEventMeshTcpMonitor();
         this.downstreamDispatchStrategy = downstreamDispatchStrategy;
+        this.persistentMsgConsumer = new MQConsumerWrapper(eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshConnectorPluginType);
+        this.broadCastMsgConsumer = new MQConsumerWrapper(eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshConnectorPluginType);
+        this.mqProducerWrapper = new MQProducerWrapper(eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshConnectorPluginType);
     }
 
     public ConcurrentHashMap<String, Set<Session>> getTopic2sessionInGroupMapping() {
@@ -163,8 +169,6 @@ public class ClientGroupWrapper {
         return true;
     }
 
-    private MQProducerWrapper mqProducerWrapper = new MQProducerWrapper();
-
     public MQProducerWrapper getMqProducerWrapper() {
         return mqProducerWrapper;
     }
diff --git a/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshExtensionFactory.java b/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshExtensionFactory.java
index 96e054b..6aea9db 100644
--- a/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshExtensionFactory.java
+++ b/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshExtensionFactory.java
@@ -18,6 +18,7 @@
 package org.apache.eventmesh.spi;
 
 import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
 
 public enum EventMeshExtensionFactory {
     ;
diff --git a/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshExtensionLoader.java b/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshExtensionLoader.java
index 740ecb3..89696e0 100644
--- a/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshExtensionLoader.java
+++ b/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshExtensionLoader.java
@@ -17,6 +17,9 @@
 
 package org.apache.eventmesh.spi;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.URL;
@@ -27,6 +30,8 @@ import java.util.concurrent.ConcurrentHashMap;
 public enum EventMeshExtensionLoader {
     ;
 
+    private static final Logger logger = LoggerFactory.getLogger(EventMeshExtensionLoader.class);
+
     private static final ConcurrentHashMap<Class<?>, ConcurrentHashMap<String, Class<?>>> EXTENSION_CLASS_LOAD_CACHE = new ConcurrentHashMap<>(16);
 
     private static final ConcurrentHashMap<String, Object> EXTENSION_INSTANCE_CACHE = new ConcurrentHashMap<>(16);
@@ -54,7 +59,9 @@ public enum EventMeshExtensionLoader {
         }
         Class<?> aClass = extensionClassMap.get(extensionName);
         try {
-            EXTENSION_INSTANCE_CACHE.put(extensionName, aClass.newInstance());
+            Object extensionObj = aClass.newInstance();
+            logger.info("initialize extension instance success, extensionType: {}, extensionName: {}", extensionType, extensionName);
+            EXTENSION_INSTANCE_CACHE.put(extensionName, extensionObj);
         } catch (InstantiationException | IllegalAccessException e) {
             throw new ExtensionException("Extension initialize error", e);
         }
@@ -87,6 +94,7 @@ public enum EventMeshExtensionLoader {
                 String extensionClassStr = (String) extensionClass;
                 try {
                     Class<?> targetClass = Class.forName(extensionClassStr);
+                    logger.info("load extension class success, extensionType: {}, extensionClass: {}", extensionType, targetClass);
                     if (!extensionType.isAssignableFrom(targetClass)) {
                         throw new ExtensionException(
                                 String.format("class: %s is not subClass of %s", targetClass, extensionType));
diff --git a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/SyncRequestInstance.java b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/SyncRequestInstance.java
index 9d3af8f..329f2bc 100644
--- a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/SyncRequestInstance.java
+++ b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/SyncRequestInstance.java
@@ -34,10 +34,15 @@ public class SyncRequestInstance {
     public static void main(String[] args) throws Exception {
 
         LiteProducer liteProducer = null;
+        String eventMeshIPPort = "127.0.0.1:10105";
+        String topic = "EventMesh.SyncRequestInstance";
         try {
-            String eventMeshIPPort = args[0];
-
-            final String topic = args[1];
+            if (args.length > 0 && StringUtils.isNotBlank(args[0])) {
+                eventMeshIPPort = args[0];
+            }
+            if (args.length > 1 && StringUtils.isNotBlank(args[1])) {
+                topic = args[1];
+            }
 
             if (StringUtils.isBlank(eventMeshIPPort)) {
                 // if has multi value, can config as: 127.0.0.1:10105;127.0.0.2:10105

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org