You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2019/01/02 16:45:27 UTC
[rocketmq] 03/14: Modify Serviceloader implementation
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch snode
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit c668abb8cff8da1f040a89fd0043893a81b6aece
Author: duhengforever <du...@gmail.com>
AuthorDate: Wed Dec 12 20:55:42 2018 +0800
Modify Serviceloader implementation
---
.../apache/rocketmq/broker/BrokerController.java | 8 +--
.../apache/rocketmq/broker/out/BrokerOuterAPI.java | 4 +-
.../rocketmq/broker/util/ServiceProviderTest.java | 1 +
.../apache/rocketmq/namesrv/NamesrvController.java | 2 +-
.../apache/rocketmq/remoting/RemotingClient.java | 2 +-
.../rocketmq/remoting/RemotingClientFactory.java | 16 +++---
.../apache/rocketmq/remoting/RemotingServer.java | 2 +-
.../rocketmq/remoting/RemotingServerFactory.java | 24 ++++-----
.../rocketmq/remoting/common/RemotingUtil.java | 2 +
.../remoting/transport/http2/Http2ClientImpl.java | 3 +-
.../remoting/transport/http2/Http2ServerImpl.java | 3 +-
.../transport/rocketmq/NettyRemotingClient.java | 3 +-
.../transport/rocketmq/NettyRemotingServer.java | 3 +-
.../rocketmq/remoting/util/RemotingUtil.java | 6 ---
.../rocketmq/remoting/util/ServiceProvider.java | 58 +++++++++++++---------
15 files changed, 70 insertions(+), 67 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 024ce67..1cbd39c 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -246,14 +246,14 @@ public class BrokerController {
result = result && this.messageStore.load();
if (result) {
- this.remotingServer = RemotingServerFactory.getRemotingServer();
+ this.remotingServer = RemotingServerFactory.createInstance();
this.remotingServer.init(this.nettyServerConfig, this.clientHousekeepingService);
// this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();
fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
- this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);
-// this.fastRemotingServer = RemotingServerFactory.getRemotingServer();
-// this.fastRemotingServer.init(this.nettyServerConfig, this.clientHousekeepingService);
+// this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);
+ this.fastRemotingServer = RemotingServerFactory.createInstance();
+ this.fastRemotingServer.init(fastConfig, this.clientHousekeepingService);
this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getSendMessageThreadPoolNums(),
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index d157021..9edfcb8 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -71,9 +71,7 @@ public class BrokerOuterAPI {
}
public BrokerOuterAPI(final NettyClientConfig nettyClientConfig, RPCHook rpcHook) {
- this.remotingClient = RemotingClientFactory.getClient();
- this.remotingClient.init(nettyClientConfig, null);
-// this.remotingClient = new NettyRemotingClient(nettyClientConfig);
+ this.remotingClient = RemotingClientFactory.createInstance().init(nettyClientConfig, null);
this.remotingClient.registerRPCHook(rpcHook);
}
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/util/ServiceProviderTest.java b/broker/src/test/java/org/apache/rocketmq/broker/util/ServiceProviderTest.java
index 22228a6..1437ffc 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/util/ServiceProviderTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/util/ServiceProviderTest.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.broker.util;
import org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener;
import org.apache.rocketmq.broker.transaction.TransactionalMessageService;
+import org.apache.rocketmq.remoting.util.ServiceProvider;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
index 6faccf7..a329e70 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
@@ -77,7 +77,7 @@ public class NamesrvController {
this.kvConfigManager.load();
- this.remotingServer = RemotingServerFactory.getRemotingServer();
+ this.remotingServer = RemotingServerFactory.createInstance();
this.remotingServer.init(this.nettyServerConfig, this.brokerHousekeepingService);
// this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
index ab4e914..88bca57 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
@@ -53,5 +53,5 @@ public interface RemotingClient extends RemotingService {
boolean isChannelWritable(final String addr);
- void init(NettyClientConfig nettyClientConfig, ChannelEventListener channelEventListener);
+ RemotingClient init(NettyClientConfig nettyClientConfig, ChannelEventListener channelEventListener);
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClientFactory.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClientFactory.java
index 5e87ec9..a766625 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClientFactory.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClientFactory.java
@@ -4,7 +4,7 @@ import java.util.Map;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
-import org.apache.rocketmq.remoting.util.RemotingUtil;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.util.ServiceProvider;
public class RemotingClientFactory {
@@ -13,21 +13,21 @@ public class RemotingClientFactory {
private RemotingClientFactory() {
}
- private static Map<String, RemotingClient> clients;
+ private static Map<String, String> paths;
private static final String CLIENT_LOCATION = "META-INF/service/org.apache.rocketmq.remoting.RemotingClient";
static {
log.info("begin load client");
- clients = ServiceProvider.load(CLIENT_LOCATION, RemotingClient.class);
- log.info("end load client, size:{}", clients.size());
+ paths = ServiceProvider.loadPath(CLIENT_LOCATION);
+ log.info("end load client, size:{}", paths.size());
}
- public static RemotingClient getClient(String protocolType) {
- return clients.get(protocolType);
+ public static RemotingClient createInstance(String protocol) {
+ return ServiceProvider.createInstance(paths.get(protocol), RemotingClient.class);
}
- public static RemotingClient getClient() {
- return clients.get(RemotingUtil.DEFAULT_PROTOCOL);
+ public static RemotingClient createInstance() {
+ return ServiceProvider.createInstance(paths.get(RemotingUtil.DEFAULT_PROTOCOL), RemotingClient.class);
}
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServer.java
index 6a5fb91..0d5ff38 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServer.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServer.java
@@ -51,6 +51,6 @@ public interface RemotingServer extends RemotingService {
throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException,
RemotingSendRequestException;
- void init(NettyServerConfig nettyServerConfig, ChannelEventListener channelEventListener);
+ RemotingServer init(NettyServerConfig nettyServerConfig, ChannelEventListener channelEventListener);
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServerFactory.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServerFactory.java
index e7a7700..125d4e0 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServerFactory.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServerFactory.java
@@ -4,7 +4,7 @@ import java.util.Map;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
-import org.apache.rocketmq.remoting.util.RemotingUtil;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.util.ServiceProvider;
public class RemotingServerFactory {
@@ -14,27 +14,21 @@ public class RemotingServerFactory {
private RemotingServerFactory() {
}
- private static Map<String, RemotingServer> servers;
-
-// private static Map<String/*protocolType*/, String/*path*/ >
+ private static Map<String, String> protocolPathMap;
private static final String SERVER_LOCATION = "META-INF/service/org.apache.rocketmq.remoting.RemotingServer";
static {
log.info("begin load server");
- servers = ServiceProvider.load(SERVER_LOCATION, RemotingClient.class);
- log.info("end load server, size:{}", servers.size());
+ protocolPathMap = ServiceProvider.loadPath(SERVER_LOCATION);
+ log.info("end load server, size:{}", protocolPathMap.size());
}
- public static RemotingServer getRemotingServer() {
- return getRemotingServer(RemotingUtil.DEFAULT_PROTOCOL);
- }
- public static RemotingServer getRemotingServer(String protocolType) {
- return servers.get(protocolType);
+ public static RemotingServer createInstance(String protocol) {
+ return ServiceProvider.createInstance(protocolPathMap.get(protocol), RemotingClient.class);
+ }
+ public static RemotingServer createInstance() {
+ return ServiceProvider.createInstance(protocolPathMap.get(RemotingUtil.DEFAULT_PROTOCOL), RemotingServer.class);
}
-
-// public static RemotingServer createNewInstance(String protocolType){
-// return ServiceProvider.load()
-// }
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java
index 3da3a18..88008ab 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java
@@ -41,6 +41,8 @@ public class RemotingUtil {
private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
private static boolean isLinuxPlatform = false;
private static boolean isWindowsPlatform = false;
+ public static final String DEFAULT_PROTOCOL = "http2";
+ public static final String REMOTING_CHARSET = "UTF-8";
static {
if (OS_NAME != null && OS_NAME.toLowerCase().contains("linux")) {
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ClientImpl.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ClientImpl.java
index b294958..65504d1 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ClientImpl.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ClientImpl.java
@@ -64,7 +64,7 @@ public class Http2ClientImpl extends NettyRemotingClientAbstract implements Remo
}
@Override
- public void init(NettyClientConfig clientConfig, ChannelEventListener channelEventListener) {
+ public RemotingClient init(NettyClientConfig clientConfig, ChannelEventListener channelEventListener) {
this.nettyClientConfig = clientConfig;
this.channelEventListener = channelEventListener;
this.ioGroup = new NioEventLoopGroup(clientConfig.getClientWorkerThreads(), ThreadUtils.newGenericThreadFactory("NettyClientEpollIoThreads",
@@ -75,6 +75,7 @@ public class Http2ClientImpl extends NettyRemotingClientAbstract implements Remo
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(clientConfig.getClientWorkerThreads(),
ThreadUtils.newGenericThreadFactory("NettyClientWorkerThreads", clientConfig.getClientWorkerThreads()));
buildSslContext();
+ return this;
}
private void buildSslContext() {
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ServerImpl.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ServerImpl.java
index 02c4fb6..6ff3d90 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ServerImpl.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ServerImpl.java
@@ -74,7 +74,7 @@ public class Http2ServerImpl extends NettyRemotingServerAbstract implements Remo
}
@Override
- public void init(NettyServerConfig nettyServerConfig, ChannelEventListener channelEventListener) {
+ public RemotingServer init(NettyServerConfig nettyServerConfig, ChannelEventListener channelEventListener) {
super.init(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
this.serverBootstrap = new ServerBootstrap();
this.serverConfig = nettyServerConfig;
@@ -100,6 +100,7 @@ public class Http2ServerImpl extends NettyRemotingServerAbstract implements Remo
ThreadUtils.newGenericThreadFactory("NettyWorkerThreads", serverConfig.getServerWorkerThreads()));
this.port = nettyServerConfig.getListenPort();
buildHttp2SslContext();
+ return this;
}
private void buildHttp2SslContext() {
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingClient.java
index 14666d2..4e691f1 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingClient.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingClient.java
@@ -86,7 +86,7 @@ public class NettyRemotingClient extends NettyRemotingClientAbstract implements
}
@Override
- public void init(NettyClientConfig nettyClientConfig, ChannelEventListener channelEventListener) {
+ public RemotingClient init(NettyClientConfig nettyClientConfig, ChannelEventListener channelEventListener) {
this.nettyClientConfig = nettyClientConfig;
this.channelEventListener = channelEventListener;
this.eventLoopGroupWorker = new NioEventLoopGroup(nettyClientConfig.getClientWorkerThreads(), ThreadUtils.newGenericThreadFactory("NettyClientEpollIoThreads",
@@ -107,6 +107,7 @@ public class NettyRemotingClient extends NettyRemotingClientAbstract implements
throw new RuntimeException("Failed to create SSLContext", e);
}
}
+ return this;
}
@Override
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingServer.java
index 9e7d419..d167b49 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingServer.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingServer.java
@@ -95,7 +95,7 @@ public class NettyRemotingServer extends NettyRemotingServerAbstract implements
}
@Override
- public void init(NettyServerConfig serverConfig, ChannelEventListener channelEventListener) {
+ public RemotingServer init(NettyServerConfig serverConfig, ChannelEventListener channelEventListener) {
this.nettyServerConfig = serverConfig;
super.init(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
this.serverBootstrap = new ServerBootstrap();
@@ -126,6 +126,7 @@ public class NettyRemotingServer extends NettyRemotingServerAbstract implements
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(serverConfig.getServerWorkerThreads(),
ThreadUtils.newGenericThreadFactory("NettyWorkerThreads", serverConfig.getServerWorkerThreads()));
loadSslContext();
+ return this;
}
public void loadSslContext() {
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/util/RemotingUtil.java b/remoting/src/main/java/org/apache/rocketmq/remoting/util/RemotingUtil.java
deleted file mode 100644
index ccd037f..0000000
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/util/RemotingUtil.java
+++ /dev/null
@@ -1,6 +0,0 @@
-package org.apache.rocketmq.remoting.util;
-
-public class RemotingUtil {
- public static final String REMOTING_CHARSET = "UTF-8";
- public static final String DEFAULT_PROTOCOL = "rocketmq";
-}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/util/ServiceProvider.java b/remoting/src/main/java/org/apache/rocketmq/remoting/util/ServiceProvider.java
index 24fa721..33c8312 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/util/ServiceProvider.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/util/ServiceProvider.java
@@ -15,6 +15,7 @@ package org.apache.rocketmq.remoting.util;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
+import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.logging.InternalLogger;
@@ -91,11 +92,10 @@ public class ServiceProvider {
}
}
- public static <T> Map<String, T> load(String path, Class<?> clazz) {
- LOG.info("Looking for a resource file of name [{}] ...", path);
- Map<String, T> services = new ConcurrentHashMap<String, T>();
+ public static Map<String, String> loadPath(String path) {
+ LOG.info("Load path looking for a resource file of name [{}] ...", path);
+ Map<String, String> pathMap = new HashMap<String, String>();
try {
-
final InputStream is = getResourceAsStream(getContextClassLoader(), path);
if (is != null) {
BufferedReader reader;
@@ -106,29 +106,35 @@ public class ServiceProvider {
}
String serviceName = reader.readLine();
while (serviceName != null && !"".equals(serviceName)) {
- LOG.info(
- "Creating an instance as specified by file {} which was present in the path of the context classloader.",
- path);
String[] service = serviceName.split("=");
- if (service.length != 2) {
- continue;
- } else {
- if (services.containsKey(service[0])) {
+ if (service.length == 2) {
+ if (pathMap.containsKey(service[0])) {
continue;
} else {
- LOG.info("Begin to load protocol: " + service[0]);
- services.put(service[0], (T) initService(getContextClassLoader(), service[1], clazz));
+ pathMap.put(service[0], service[1]);
}
+ } else {
+ continue;
}
serviceName = reader.readLine();
}
reader.close();
- } else {
- // is == null
- LOG.warn("No resource file with name [{}] found.", path);
}
- } catch (Exception e) {
- LOG.error("Error occured when looking for resource file " + path, e);
+ } catch (Exception ex) {
+ LOG.error("Error occured when looking for resource file " + path, ex);
+ }
+ return pathMap;
+ }
+
+ public static <T> Map<String, T> load(String path, Class<?> clazz) {
+ LOG.info("Load map is looking for a resource file of name [{}] ...", path);
+ Map<String, T> services = new HashMap<String, T>();
+ Map<String, String> pathMaps = loadPath(path);
+ for (Map.Entry<String, String> entry : pathMaps.entrySet()) {
+ T instance = (T) createInstance(entry.getValue(), clazz);
+ if (instance != null && !services.containsKey(entry.getKey())) {
+ services.put(entry.getKey(), instance);
+ }
}
return services;
}
@@ -145,12 +151,7 @@ public class ServiceProvider {
}
String serviceName = reader.readLine();
reader.close();
- if (serviceName != null && !"".equals(serviceName)) {
- return initService(getContextClassLoader(), serviceName, clazz);
- } else {
- LOG.warn("ServiceName is empty!");
- return null;
- }
+ return createInstance(serviceName, clazz);
} catch (Exception e) {
LOG.warn("Error occurred when looking for resource file " + name, e);
}
@@ -158,6 +159,15 @@ public class ServiceProvider {
return null;
}
+ public static <T> T createInstance(String serviceName, Class<?> clazz) {
+ if (serviceName != null && !"".equals(serviceName)) {
+ return initService(getContextClassLoader(), serviceName, clazz);
+ } else {
+ LOG.warn("ServiceName is empty!");
+ return null;
+ }
+ }
+
protected static <T> T initService(ClassLoader classLoader, String serviceName, Class<?> clazz) {
Class<?> serviceClazz = null;
try {