You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by ch...@apache.org on 2021/07/09 06:16:05 UTC
[incubator-eventmesh] branch 1.3.0 updated: [ISSUE #418]Refactor
the plugin load code (#421)
This is an automated email from the ASF dual-hosted git repository.
chenguangsheng pushed a commit to branch 1.3.0
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
The following commit(s) were added to refs/heads/1.3.0 by this push:
new 8ecbdcf [ISSUE #418]Refactor the plugin load code (#421)
8ecbdcf is described below
commit 8ecbdcf1e5a176a81a24fde5faba6a08726f76f5
Author: Wenjun Ruan <86...@qq.com>
AuthorDate: Fri Jul 9 14:15:59 2021 +0800
[ISSUE #418]Refactor the plugin load code (#421)
* [ISSUE #418]Refactor the plugin load code
* fix ut
---
.../eventmesh-runtime-quickstart.zh-CN.md | 16 ++--
.../instructions/eventmesh-runtime-quickstart.md | 16 ++--
docs/images/project-structure.png | Bin 63401 -> 54777 bytes
.../common/config/CommonConfiguration.java | 105 ++-------------------
.../src/test/resources/configuration.properties | 1 +
eventmesh-connector-api/build.gradle | 4 +-
eventmesh-connector-api/gradle.properties | 2 +-
.../eventmesh/api/consumer/MeshMQPushConsumer.java | 2 +
.../eventmesh/api/producer/MeshMQProducer.java | 2 +
...pache.eventmesh.api.consumer.MeshMQPushConsumer | 2 +-
...rg.apache.eventmesh.api.producer.MeshMQProducer | 2 +-
eventmesh-runtime/build.gradle | 4 +-
eventmesh-runtime/conf/eventmesh.properties | 5 +-
.../runtime/core/plugin/MQConsumerWrapper.java | 23 ++---
.../runtime/core/plugin/MQProducerWrapper.java | 22 ++---
.../runtime/core/plugin/PluginFactory.java | 39 ++++++++
.../protocol/http/consumer/EventMeshConsumer.java | 6 +-
.../protocol/http/producer/EventMeshProducer.java | 4 +-
.../tcp/client/group/ClientGroupWrapper.java | 12 ++-
.../eventmesh/spi/EventMeshExtensionFactory.java | 1 +
.../eventmesh/spi/EventMeshExtensionLoader.java | 10 +-
.../eventmesh/http/demo/SyncRequestInstance.java | 11 ++-
22 files changed, 126 insertions(+), 163 deletions(-)
diff --git a/docs/cn/instructions/eventmesh-runtime-quickstart.zh-CN.md b/docs/cn/instructions/eventmesh-runtime-quickstart.zh-CN.md
index adc9703..ecafc29 100644
--- a/docs/cn/instructions/eventmesh-runtime-quickstart.zh-CN.md
+++ b/docs/cn/instructions/eventmesh-runtime-quickstart.zh-CN.md
@@ -62,8 +62,9 @@ sh start.sh
- eventmesh-runtime : eventmesh运行时模块
- eventmesh-sdk-java : eventmesh java客户端sdk
- eventmesh-starter : eventmesh本地启动运行项目入口
+- eventmesh-spi : eventmesh SPI加载模块
-> 注:插件模块遵循java spi机制,需要在对应模块中的/main/resources/META-INF/services 下配置相关接口与实现类的映射文件
+> 注:插件模块遵循eventmesh定义的spi机制,需要在对应模块中的/main/resources/META-INF/eventmesh 下配置相关接口与实现类的映射文件
**2.3.2 配置VM启动参数**
@@ -75,18 +76,17 @@ sh start.sh
```
> 注:如果操作系统为Windows, 可能需要将文件分隔符换成\
-**2.3.3 配置build.gradle文件**
+**2.3.3 配置插件**
-通过修改dependencies,compile project 项来指定项目启动后加载的插件
+在`eventMesh.properties`配置文件通过声明式的方式来指定项目启动后需要加载的插件
-修改`eventmesh-starter`模块下面的`build.gradle`文件
+修改`confPath`目录下面的`eventMesh.properties`文件
-加载**RocketMQ**插件配置:
+加载**RocketMQ Connector**插件配置:
```java
-dependencies {
- compile project(":eventmesh-runtime"), project(":eventmesh-connector-rocketmq")
-}
+#connector plugin
+eventMesh.connector.plugin.type=rocketmq
```
**2.3.4 启动运行**
diff --git a/docs/en/instructions/eventmesh-runtime-quickstart.md b/docs/en/instructions/eventmesh-runtime-quickstart.md
index 0536e5b..dd89795 100644
--- a/docs/en/instructions/eventmesh-runtime-quickstart.md
+++ b/docs/en/instructions/eventmesh-runtime-quickstart.md
@@ -62,9 +62,10 @@ Same with 1.2
- eventmesh-runtime : eventmesh runtime module
- eventmesh-sdk-java : eventmesh java client sdk
- eventmesh-starter : eventmesh project local start entry
+- eventmesh-spi : eventmesh SPI load module
-> ps: The loading of connector plugin follows the Java SPI mechanism, it's necessary to configure the mapping file of
-related interface and implementation class under /main/resources/meta-inf/services in the corresponding module
+> ps: The loading of connector plugin follows the eventmesh SPI mechanism, it's necessary to configure the mapping file of
+related interface and implementation class under /main/resources/meta-inf/eventmesh in the corresponding module
**2.3.2 Configure VM Options**
@@ -76,18 +77,17 @@ related interface and implementation class under /main/resources/meta-inf/servic
```
> ps: If you use Windows, you may need to replace the file separator to \
-**2.3.3 Configure build.gradle file**
+**2.3.3 Configure plugin**
-Specify the connector that will be loaded after the project start with updating compile project item in dependencies
+Specify the connector plugin that will be loaded after the project start by declaring in `eventMesh.properties`
-update `build.gradle` file under the `eventmesh-starter` module
+Modify the `eventMesh.properties` file in the `confPath` directory
load **rocketmq connector** configuration:
```java
-dependencies {
- compile project(":eventmesh-runtime"), project(":eventmesh-connector-rocketmq")
-}
+#connector plugin
+eventMesh.connector.plugin.type=rocketmq
```
**2.3.4 Run**
diff --git a/docs/images/project-structure.png b/docs/images/project-structure.png
index 252a953..efc5249 100644
Binary files a/docs/images/project-structure.png and b/docs/images/project-structure.png differ
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java
index 08a44cb..1ae6b26 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java
@@ -17,17 +17,10 @@
package org.apache.eventmesh.common.config;
-import java.net.Inet6Address;
-import java.net.InetAddress;
-import java.net.NetworkInterface;
-import java.net.SocketException;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.Enumeration;
-
import com.google.common.base.Preconditions;
import org.apache.commons.lang3.StringUtils;
+import org.apache.eventmesh.common.IPUtil;
public class CommonConfiguration {
public String eventMeshEnv = "P";
@@ -35,7 +28,7 @@ public class CommonConfiguration {
public String eventMeshCluster = "LS";
public String eventMeshName = "";
public String sysID = "5477";
-
+ public String eventMeshConnectorPluginType = "rocketmq";
public String namesrvAddr = "";
public String clientUserName = "username";
@@ -84,8 +77,11 @@ public class CommonConfiguration {
eventMeshServerIp = configurationWraper.getProp(ConfKeys.KEYS_EVENTMESH_SERVER_HOST_IP);
if (StringUtils.isBlank(eventMeshServerIp)) {
- eventMeshServerIp = getLocalAddr();
+ eventMeshServerIp = IPUtil.getLocalAddress();
}
+
+ eventMeshConnectorPluginType = configurationWraper.getProp(ConfKeys.KEYS_ENENTMESH_CONNECTOR_PLUGIN_TYPE);
+ Preconditions.checkState(StringUtils.isNotEmpty(eventMeshConnectorPluginType), String.format("%s error", ConfKeys.KEYS_ENENTMESH_CONNECTOR_PLUGIN_TYPE));
}
}
@@ -105,94 +101,7 @@ public class CommonConfiguration {
public static String KEYS_EVENTMESH_SERVER_REGISTER_INTERVAL = "eventMesh.server.registry.registerIntervalInMills";
public static String KEYS_EVENTMESH_SERVER_FETCH_REGISTRY_ADDR_INTERVAL = "eventMesh.server.registry.fetchRegistryAddrIntervalInMills";
- }
-
- public static String getLocalAddr() {
- //priority of networkInterface when generating client ip
- String priority = System.getProperty("networkInterface.priority", "bond1<eth1<eth0");
- ArrayList<String> preferList = new ArrayList<String>();
- for (String eth : priority.split("<")) {
- preferList.add(eth);
- }
- NetworkInterface preferNetworkInterface = null;
-
- try {
- Enumeration<NetworkInterface> enumeration1 = NetworkInterface.getNetworkInterfaces();
- while (enumeration1.hasMoreElements()) {
- final NetworkInterface networkInterface = enumeration1.nextElement();
- if (!preferList.contains(networkInterface.getName())) {
- continue;
- } else if (preferNetworkInterface == null) {
- preferNetworkInterface = networkInterface;
- }
- //get the networkInterface that has higher priority
- else if (preferList.indexOf(networkInterface.getName())
- > preferList.indexOf(preferNetworkInterface.getName())) {
- preferNetworkInterface = networkInterface;
- }
- }
-
- // Traversal Network interface to get the first non-loopback and non-private address
- ArrayList<String> ipv4Result = new ArrayList<String>();
- ArrayList<String> ipv6Result = new ArrayList<String>();
-
- if (preferNetworkInterface != null) {
- final Enumeration<InetAddress> en = preferNetworkInterface.getInetAddresses();
- getIpResult(ipv4Result, ipv6Result, en);
- } else {
- Enumeration<NetworkInterface> enumeration = NetworkInterface.getNetworkInterfaces();
- while (enumeration.hasMoreElements()) {
- final NetworkInterface networkInterface = enumeration.nextElement();
- final Enumeration<InetAddress> en = networkInterface.getInetAddresses();
- getIpResult(ipv4Result, ipv6Result, en);
- }
- }
-
- // prefer ipv4
- if (!ipv4Result.isEmpty()) {
- for (String ip : ipv4Result) {
- if (ip.startsWith("127.0") || ip.startsWith("192.168")) {
- continue;
- }
-
- return ip;
- }
- return ipv4Result.get(ipv4Result.size() - 1);
- } else if (!ipv6Result.isEmpty()) {
- return ipv6Result.get(0);
- }
- //If failed to find,fall back to localhost
- final InetAddress localHost = InetAddress.getLocalHost();
- return normalizeHostAddress(localHost);
- } catch (SocketException e) {
- e.printStackTrace();
- } catch (UnknownHostException e) {
- e.printStackTrace();
- }
-
- return null;
- }
-
- public static String normalizeHostAddress(final InetAddress localHost) {
- if (localHost instanceof Inet6Address) {
- return "[" + localHost.getHostAddress() + "]";
- } else {
- return localHost.getHostAddress();
- }
- }
-
- private static void getIpResult(ArrayList<String> ipv4Result, ArrayList<String> ipv6Result,
- Enumeration<InetAddress> en) {
- while (en.hasMoreElements()) {
- final InetAddress address = en.nextElement();
- if (!address.isLoopbackAddress()) {
- if (address instanceof Inet6Address) {
- ipv6Result.add(normalizeHostAddress(address));
- } else {
- ipv4Result.add(normalizeHostAddress(address));
- }
- }
- }
+ public static String KEYS_ENENTMESH_CONNECTOR_PLUGIN_TYPE = "eventMesh.connector.plugin.type";
}
}
\ No newline at end of file
diff --git a/eventmesh-common/src/test/resources/configuration.properties b/eventmesh-common/src/test/resources/configuration.properties
index d7c5096..76f29f2 100644
--- a/eventmesh-common/src/test/resources/configuration.properties
+++ b/eventmesh-common/src/test/resources/configuration.properties
@@ -21,3 +21,4 @@ eventMesh.sysid=3
eventMesh.server.cluster=value4
eventMesh.server.name=value5
eventMesh.server.hostIp=value6
+eventMesh.connector.plugin.type=rocketmq
diff --git a/eventmesh-connector-api/build.gradle b/eventmesh-connector-api/build.gradle
index 2d1205d..157048e 100644
--- a/eventmesh-connector-api/build.gradle
+++ b/eventmesh-connector-api/build.gradle
@@ -20,6 +20,6 @@ List open_message = [
]
dependencies {
- implementation open_message,project(":eventmesh-common")
- testImplementation open_message,project(":eventmesh-common")
+ implementation open_message,project(":eventmesh-common"), project(":eventmesh-spi")
+ testImplementation open_message,project(":eventmesh-common"), project(":eventmesh-spi")
}
diff --git a/eventmesh-connector-api/gradle.properties b/eventmesh-connector-api/gradle.properties
index ae30087..9d1744e 100644
--- a/eventmesh-connector-api/gradle.properties
+++ b/eventmesh-connector-api/gradle.properties
@@ -16,6 +16,6 @@
#
group=org.apache.eventmesh
version=1.2.0-SNAPSHOT
-jdk=1.7
+jdk=1.8
mavenUserName=
mavenPassword=
\ No newline at end of file
diff --git a/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/MeshMQPushConsumer.java b/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/MeshMQPushConsumer.java
index 5e60e0e..4ac1edb 100644
--- a/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/MeshMQPushConsumer.java
+++ b/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/MeshMQPushConsumer.java
@@ -25,7 +25,9 @@ import io.openmessaging.api.Consumer;
import io.openmessaging.api.Message;
import org.apache.eventmesh.api.AbstractContext;
+import org.apache.eventmesh.spi.EventMeshSPI;
+@EventMeshSPI
public interface MeshMQPushConsumer extends Consumer {
void init(Properties keyValue) throws Exception;
diff --git a/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/MeshMQProducer.java b/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/MeshMQProducer.java
index 82ca583..c717385 100644
--- a/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/MeshMQProducer.java
+++ b/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/MeshMQProducer.java
@@ -24,7 +24,9 @@ import io.openmessaging.api.Producer;
import io.openmessaging.api.SendCallback;
import org.apache.eventmesh.api.RRCallback;
+import org.apache.eventmesh.spi.EventMeshSPI;
+@EventMeshSPI
public interface MeshMQProducer extends Producer {
void init(Properties properties) throws Exception;
diff --git a/eventmesh-connector-rocketmq/src/main/resources/META-INF/services/org.apache.eventmesh.api.consumer.MeshMQPushConsumer b/eventmesh-connector-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.consumer.MeshMQPushConsumer
similarity index 90%
rename from eventmesh-connector-rocketmq/src/main/resources/META-INF/services/org.apache.eventmesh.api.consumer.MeshMQPushConsumer
rename to eventmesh-connector-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.consumer.MeshMQPushConsumer
index c98880a..0df2e28 100644
--- a/eventmesh-connector-rocketmq/src/main/resources/META-INF/services/org.apache.eventmesh.api.consumer.MeshMQPushConsumer
+++ b/eventmesh-connector-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.consumer.MeshMQPushConsumer
@@ -13,4 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-org.apache.eventmesh.connector.rocketmq.consumer.RocketMQConsumerImpl
\ No newline at end of file
+rocketmq=org.apache.eventmesh.connector.rocketmq.consumer.RocketMQConsumerImpl
\ No newline at end of file
diff --git a/eventmesh-connector-rocketmq/src/main/resources/META-INF/services/org.apache.eventmesh.api.producer.MeshMQProducer b/eventmesh-connector-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.producer.MeshMQProducer
similarity index 90%
rename from eventmesh-connector-rocketmq/src/main/resources/META-INF/services/org.apache.eventmesh.api.producer.MeshMQProducer
rename to eventmesh-connector-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.producer.MeshMQProducer
index 28907ca..ef4959d 100644
--- a/eventmesh-connector-rocketmq/src/main/resources/META-INF/services/org.apache.eventmesh.api.producer.MeshMQProducer
+++ b/eventmesh-connector-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.producer.MeshMQProducer
@@ -13,4 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-org.apache.eventmesh.connector.rocketmq.producer.RocketMQProducerImpl
\ No newline at end of file
+rocketmq=org.apache.eventmesh.connector.rocketmq.producer.RocketMQProducerImpl
\ No newline at end of file
diff --git a/eventmesh-runtime/build.gradle b/eventmesh-runtime/build.gradle
index 09dd79f..e5bb065 100644
--- a/eventmesh-runtime/build.gradle
+++ b/eventmesh-runtime/build.gradle
@@ -31,6 +31,6 @@ List open_message = [
dependencies {
- implementation metrics, open_message,project(":eventmesh-connector-api"),project(":eventmesh-common")
- testImplementation metrics,open_message,project(":eventmesh-common"),project(":eventmesh-connector-api")
+ implementation metrics, open_message,project(":eventmesh-connector-api"),project(":eventmesh-common"),project(":eventmesh-spi")
+ testImplementation metrics,open_message,project(":eventmesh-common"),project(":eventmesh-connector-api"),project(":eventmesh-spi")
}
diff --git a/eventmesh-runtime/conf/eventmesh.properties b/eventmesh-runtime/conf/eventmesh.properties
index 035b950..45fc193 100644
--- a/eventmesh-runtime/conf/eventmesh.properties
+++ b/eventmesh-runtime/conf/eventmesh.properties
@@ -51,4 +51,7 @@ eventMesh.server.admin.http.port=10106
eventMesh.server.registry.registerIntervalInMills=10000
eventMesh.server.registry.fetchRegistryAddrIntervalInMills=20000
#auto-ack
-#eventMesh.server.defibus.client.comsumeTimeoutInMin=5
\ No newline at end of file
+#eventMesh.server.defibus.client.comsumeTimeoutInMin=5
+
+#connector plugin
+eventMesh.connector.plugin.type=rocketmq
\ No newline at end of file
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQConsumerWrapper.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQConsumerWrapper.java
index 080b7af..b629854 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQConsumerWrapper.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQConsumerWrapper.java
@@ -19,7 +19,6 @@ package org.apache.eventmesh.runtime.core.plugin;
import java.util.List;
import java.util.Properties;
-import java.util.ServiceLoader;
import io.openmessaging.api.AsyncMessageListener;
import io.openmessaging.api.Message;
@@ -35,6 +34,14 @@ public class MQConsumerWrapper extends MQWrapper {
protected MeshMQPushConsumer meshMQPushConsumer;
+ public MQConsumerWrapper(String connectorPluginType) {
+ this.meshMQPushConsumer = PluginFactory.getMeshMQPushConsumer(connectorPluginType);
+ if (meshMQPushConsumer == null) {
+ logger.error("can't load the meshMQPushConsumer plugin, please check.");
+ throw new RuntimeException("doesn't load the meshMQPushConsumer plugin, please check.");
+ }
+ }
+
public void subscribe(String topic, AsyncMessageListener listener) throws Exception {
meshMQPushConsumer.subscribe(topic, listener);
}
@@ -44,24 +51,10 @@ public class MQConsumerWrapper extends MQWrapper {
}
public synchronized void init(Properties keyValue) throws Exception {
- meshMQPushConsumer = getMeshMQPushConsumer();
- if (meshMQPushConsumer == null) {
- logger.error("can't load the meshMQPushConsumer plugin, please check.");
- throw new RuntimeException("doesn't load the meshMQPushConsumer plugin, please check.");
- }
-
meshMQPushConsumer.init(keyValue);
inited.compareAndSet(false, true);
}
- private MeshMQPushConsumer getMeshMQPushConsumer() {
- ServiceLoader<MeshMQPushConsumer> meshMQPushConsumerServiceLoader = ServiceLoader.load(MeshMQPushConsumer.class);
- if (meshMQPushConsumerServiceLoader.iterator().hasNext()) {
- return meshMQPushConsumerServiceLoader.iterator().next();
- }
- return null;
- }
-
public synchronized void start() throws Exception {
meshMQPushConsumer.start();
started.compareAndSet(false, true);
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQProducerWrapper.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQProducerWrapper.java
index 082ab3b..60fe842 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQProducerWrapper.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQProducerWrapper.java
@@ -34,27 +34,21 @@ public class MQProducerWrapper extends MQWrapper {
protected MeshMQProducer meshMQProducer;
- public synchronized void init(Properties keyValue) throws Exception {
- if (inited.get()) {
- return;
- }
-
- meshMQProducer = getSpiMeshMQProducer();
+ public MQProducerWrapper(String connectorPluginType) {
+ this.meshMQProducer = PluginFactory.getMeshMQProducer(connectorPluginType);
if (meshMQProducer == null) {
logger.error("can't load the meshMQProducer plugin, please check.");
throw new RuntimeException("doesn't load the meshMQProducer plugin, please check.");
}
- meshMQProducer.init(keyValue);
-
- inited.compareAndSet(false, true);
}
- private MeshMQProducer getSpiMeshMQProducer() {
- ServiceLoader<MeshMQProducer> meshMQProducerServiceLoader = ServiceLoader.load(MeshMQProducer.class);
- if (meshMQProducerServiceLoader.iterator().hasNext()) {
- return meshMQProducerServiceLoader.iterator().next();
+ public synchronized void init(Properties keyValue) throws Exception {
+ if (inited.get()) {
+ return;
}
- return null;
+ meshMQProducer.init(keyValue);
+
+ inited.compareAndSet(false, true);
}
public synchronized void start() throws Exception {
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/PluginFactory.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/PluginFactory.java
new file mode 100644
index 0000000..b114953
--- /dev/null
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/PluginFactory.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to Apache Software Foundation (ASF) under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Apache Software Foundation (ASF) licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.eventmesh.runtime.core.plugin;
+
+import org.apache.eventmesh.api.consumer.MeshMQPushConsumer;
+import org.apache.eventmesh.api.producer.MeshMQProducer;
+import org.apache.eventmesh.spi.EventMeshExtensionFactory;
+
+public class PluginFactory {
+
+ public static MeshMQProducer getMeshMQProducer(String connectorPluginName) {
+ return EventMeshExtensionFactory.getExtension(MeshMQProducer.class, connectorPluginName);
+ }
+
+ public static MeshMQPushConsumer getMeshMQPushConsumer(String connectorPluginName) {
+ return EventMeshExtensionFactory.getExtension(MeshMQPushConsumer.class, connectorPluginName);
+ }
+
+ private static <T> T getPlugin(Class<T> pluginType, String pluginName) {
+ return EventMeshExtensionFactory.getExtension(pluginType, pluginName);
+ }
+}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java
index 8620e68..ef051b0 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java
@@ -66,13 +66,15 @@ public class EventMeshConsumer {
private ConsumerGroupConf consumerGroupConf;
- private MQConsumerWrapper persistentMqConsumer = new MQConsumerWrapper();
+ private MQConsumerWrapper persistentMqConsumer;
- private MQConsumerWrapper broadcastMqConsumer = new MQConsumerWrapper();
+ private MQConsumerWrapper broadcastMqConsumer;
public EventMeshConsumer(EventMeshHTTPServer eventMeshHTTPServer, ConsumerGroupConf consumerGroupConf) {
this.eventMeshHTTPServer = eventMeshHTTPServer;
this.consumerGroupConf = consumerGroupConf;
+ this.persistentMqConsumer = new MQConsumerWrapper(eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshConnectorPluginType);
+ this.broadcastMqConsumer = new MQConsumerWrapper(eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshConnectorPluginType);
}
private MessageHandler httpMessageHandler;
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/producer/EventMeshProducer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/producer/EventMeshProducer.java
index cf41ca2..fe32180 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/producer/EventMeshProducer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/producer/EventMeshProducer.java
@@ -69,7 +69,7 @@ public class EventMeshProducer {
return true;
}
- protected MQProducerWrapper mqProducerWrapper = new MQProducerWrapper();
+ protected MQProducerWrapper mqProducerWrapper;
public MQProducerWrapper getMqProducerWrapper() {
return mqProducerWrapper;
@@ -85,7 +85,7 @@ public class EventMeshProducer {
//TODO for defibus
keyValue.put("eventMeshIDC", eventMeshHttpConfiguration.eventMeshIDC);
-
+ mqProducerWrapper = new MQProducerWrapper(eventMeshHttpConfiguration.eventMeshConnectorPluginType);
mqProducerWrapper.init(keyValue);
inited.compareAndSet(false, true);
logger.info("EventMeshProducer [{}] inited.............", producerGroupConfig.getGroupName());
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java
index 5829e49..310ea6d 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java
@@ -50,6 +50,7 @@ import org.apache.eventmesh.runtime.configuration.EventMeshTCPConfiguration;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.plugin.MQConsumerWrapper;
import org.apache.eventmesh.runtime.core.plugin.MQProducerWrapper;
+import org.apache.eventmesh.runtime.core.plugin.PluginFactory;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.group.dispatch.DownstreamDispatchStrategy;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.push.DownStreamMsgContext;
@@ -96,14 +97,16 @@ public class ClientGroupWrapper {
public AtomicBoolean inited4Broadcast = new AtomicBoolean(Boolean.FALSE);
- private MQConsumerWrapper persistentMsgConsumer = new MQConsumerWrapper();
+ private MQConsumerWrapper persistentMsgConsumer;
- private MQConsumerWrapper broadCastMsgConsumer = new MQConsumerWrapper();
+ private MQConsumerWrapper broadCastMsgConsumer;
private ConcurrentHashMap<String, Set<Session>> topic2sessionInGroupMapping = new ConcurrentHashMap<String, Set<Session>>();
public AtomicBoolean producerStarted = new AtomicBoolean(Boolean.FALSE);
+ private MQProducerWrapper mqProducerWrapper;
+
public ClientGroupWrapper(String sysId, String producerGroup, String consumerGroup,
EventMeshTCPServer eventMeshTCPServer,
DownstreamDispatchStrategy downstreamDispatchStrategy) {
@@ -115,6 +118,9 @@ public class ClientGroupWrapper {
this.eventMeshTcpRetryer = eventMeshTCPServer.getEventMeshTcpRetryer();
this.eventMeshTcpMonitor = eventMeshTCPServer.getEventMeshTcpMonitor();
this.downstreamDispatchStrategy = downstreamDispatchStrategy;
+ this.persistentMsgConsumer = new MQConsumerWrapper(eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshConnectorPluginType);
+ this.broadCastMsgConsumer = new MQConsumerWrapper(eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshConnectorPluginType);
+ this.mqProducerWrapper = new MQProducerWrapper(eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshConnectorPluginType);
}
public ConcurrentHashMap<String, Set<Session>> getTopic2sessionInGroupMapping() {
@@ -163,8 +169,6 @@ public class ClientGroupWrapper {
return true;
}
- private MQProducerWrapper mqProducerWrapper = new MQProducerWrapper();
-
public MQProducerWrapper getMqProducerWrapper() {
return mqProducerWrapper;
}
diff --git a/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshExtensionFactory.java b/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshExtensionFactory.java
index 96e054b..6aea9db 100644
--- a/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshExtensionFactory.java
+++ b/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshExtensionFactory.java
@@ -18,6 +18,7 @@
package org.apache.eventmesh.spi;
import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
public enum EventMeshExtensionFactory {
;
diff --git a/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshExtensionLoader.java b/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshExtensionLoader.java
index 740ecb3..89696e0 100644
--- a/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshExtensionLoader.java
+++ b/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshExtensionLoader.java
@@ -17,6 +17,9 @@
package org.apache.eventmesh.spi;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
@@ -27,6 +30,8 @@ import java.util.concurrent.ConcurrentHashMap;
public enum EventMeshExtensionLoader {
;
+ private static final Logger logger = LoggerFactory.getLogger(EventMeshExtensionLoader.class);
+
private static final ConcurrentHashMap<Class<?>, ConcurrentHashMap<String, Class<?>>> EXTENSION_CLASS_LOAD_CACHE = new ConcurrentHashMap<>(16);
private static final ConcurrentHashMap<String, Object> EXTENSION_INSTANCE_CACHE = new ConcurrentHashMap<>(16);
@@ -54,7 +59,9 @@ public enum EventMeshExtensionLoader {
}
Class<?> aClass = extensionClassMap.get(extensionName);
try {
- EXTENSION_INSTANCE_CACHE.put(extensionName, aClass.newInstance());
+ Object extensionObj = aClass.newInstance();
+ logger.info("initialize extension instance success, extensionType: {}, extensionName: {}", extensionType, extensionName);
+ EXTENSION_INSTANCE_CACHE.put(extensionName, extensionObj);
} catch (InstantiationException | IllegalAccessException e) {
throw new ExtensionException("Extension initialize error", e);
}
@@ -87,6 +94,7 @@ public enum EventMeshExtensionLoader {
String extensionClassStr = (String) extensionClass;
try {
Class<?> targetClass = Class.forName(extensionClassStr);
+ logger.info("load extension class success, extensionType: {}, extensionClass: {}", extensionType, targetClass);
if (!extensionType.isAssignableFrom(targetClass)) {
throw new ExtensionException(
String.format("class: %s is not subClass of %s", targetClass, extensionType));
diff --git a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/SyncRequestInstance.java b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/SyncRequestInstance.java
index 9d3af8f..329f2bc 100644
--- a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/SyncRequestInstance.java
+++ b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/SyncRequestInstance.java
@@ -34,10 +34,15 @@ public class SyncRequestInstance {
public static void main(String[] args) throws Exception {
LiteProducer liteProducer = null;
+ String eventMeshIPPort = "127.0.0.1:10105";
+ String topic = "EventMesh.SyncRequestInstance";
try {
- String eventMeshIPPort = args[0];
-
- final String topic = args[1];
+ if (args.length > 0 && StringUtils.isNotBlank(args[0])) {
+ eventMeshIPPort = args[0];
+ }
+ if (args.length > 1 && StringUtils.isNotBlank(args[1])) {
+ topic = args[1];
+ }
if (StringUtils.isBlank(eventMeshIPPort)) {
// if has multi value, can config as: 127.0.0.1:10105;127.0.0.2:10105
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org