You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2019/01/02 16:45:27 UTC

[rocketmq] 03/14: Modify Serviceloader implementation

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch snode
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit c668abb8cff8da1f040a89fd0043893a81b6aece
Author: duhengforever <du...@gmail.com>
AuthorDate: Wed Dec 12 20:55:42 2018 +0800

    Modify Serviceloader implementation
---
 .../apache/rocketmq/broker/BrokerController.java   |  8 +--
 .../apache/rocketmq/broker/out/BrokerOuterAPI.java |  4 +-
 .../rocketmq/broker/util/ServiceProviderTest.java  |  1 +
 .../apache/rocketmq/namesrv/NamesrvController.java |  2 +-
 .../apache/rocketmq/remoting/RemotingClient.java   |  2 +-
 .../rocketmq/remoting/RemotingClientFactory.java   | 16 +++---
 .../apache/rocketmq/remoting/RemotingServer.java   |  2 +-
 .../rocketmq/remoting/RemotingServerFactory.java   | 24 ++++-----
 .../rocketmq/remoting/common/RemotingUtil.java     |  2 +
 .../remoting/transport/http2/Http2ClientImpl.java  |  3 +-
 .../remoting/transport/http2/Http2ServerImpl.java  |  3 +-
 .../transport/rocketmq/NettyRemotingClient.java    |  3 +-
 .../transport/rocketmq/NettyRemotingServer.java    |  3 +-
 .../rocketmq/remoting/util/RemotingUtil.java       |  6 ---
 .../rocketmq/remoting/util/ServiceProvider.java    | 58 +++++++++++++---------
 15 files changed, 70 insertions(+), 67 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 024ce67..1cbd39c 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -246,14 +246,14 @@ public class BrokerController {
         result = result && this.messageStore.load();
 
         if (result) {
-            this.remotingServer = RemotingServerFactory.getRemotingServer();
+            this.remotingServer = RemotingServerFactory.createInstance();
             this.remotingServer.init(this.nettyServerConfig, this.clientHousekeepingService);
 //            this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
             NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();
             fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
-            this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);
-//            this.fastRemotingServer = RemotingServerFactory.getRemotingServer();
-//            this.fastRemotingServer.init(this.nettyServerConfig, this.clientHousekeepingService);
+//            this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);
+            this.fastRemotingServer = RemotingServerFactory.createInstance();
+            this.fastRemotingServer.init(fastConfig, this.clientHousekeepingService);
 
             this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
                 this.brokerConfig.getSendMessageThreadPoolNums(),
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index d157021..9edfcb8 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -71,9 +71,7 @@ public class BrokerOuterAPI {
     }
 
     public BrokerOuterAPI(final NettyClientConfig nettyClientConfig, RPCHook rpcHook) {
-        this.remotingClient = RemotingClientFactory.getClient();
-        this.remotingClient.init(nettyClientConfig, null);
-//        this.remotingClient = new NettyRemotingClient(nettyClientConfig);
+        this.remotingClient = RemotingClientFactory.createInstance().init(nettyClientConfig, null);
         this.remotingClient.registerRPCHook(rpcHook);
     }
 
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/util/ServiceProviderTest.java b/broker/src/test/java/org/apache/rocketmq/broker/util/ServiceProviderTest.java
index 22228a6..1437ffc 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/util/ServiceProviderTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/util/ServiceProviderTest.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.broker.util;
 
 import org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener;
 import org.apache.rocketmq.broker.transaction.TransactionalMessageService;
+import org.apache.rocketmq.remoting.util.ServiceProvider;
 import org.junit.Test;
 
 import static org.assertj.core.api.Assertions.assertThat;
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
index 6faccf7..a329e70 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
@@ -77,7 +77,7 @@ public class NamesrvController {
 
         this.kvConfigManager.load();
 
-        this.remotingServer = RemotingServerFactory.getRemotingServer();
+        this.remotingServer = RemotingServerFactory.createInstance();
         this.remotingServer.init(this.nettyServerConfig, this.brokerHousekeepingService);
 //        this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
 
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
index ab4e914..88bca57 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
@@ -53,5 +53,5 @@ public interface RemotingClient extends RemotingService {
 
     boolean isChannelWritable(final String addr);
 
-    void init(NettyClientConfig nettyClientConfig, ChannelEventListener channelEventListener);
+    RemotingClient init(NettyClientConfig nettyClientConfig, ChannelEventListener channelEventListener);
 }
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClientFactory.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClientFactory.java
index 5e87ec9..a766625 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClientFactory.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClientFactory.java
@@ -4,7 +4,7 @@ import java.util.Map;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
-import org.apache.rocketmq.remoting.util.RemotingUtil;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
 import org.apache.rocketmq.remoting.util.ServiceProvider;
 
 public class RemotingClientFactory {
@@ -13,21 +13,21 @@ public class RemotingClientFactory {
     private RemotingClientFactory() {
     }
 
-    private static Map<String, RemotingClient> clients;
+    private static Map<String, String> paths;
 
     private static final String CLIENT_LOCATION = "META-INF/service/org.apache.rocketmq.remoting.RemotingClient";
 
     static {
         log.info("begin load client");
-        clients = ServiceProvider.load(CLIENT_LOCATION, RemotingClient.class);
-        log.info("end load client, size:{}", clients.size());
+        paths = ServiceProvider.loadPath(CLIENT_LOCATION);
+        log.info("end load client, size:{}", paths.size());
     }
 
-    public static RemotingClient getClient(String protocolType) {
-        return clients.get(protocolType);
+    public static RemotingClient createInstance(String protocol) {
+        return ServiceProvider.createInstance(paths.get(protocol), RemotingClient.class);
     }
 
-    public static RemotingClient getClient() {
-        return clients.get(RemotingUtil.DEFAULT_PROTOCOL);
+    public static RemotingClient createInstance() {
+        return ServiceProvider.createInstance(paths.get(RemotingUtil.DEFAULT_PROTOCOL), RemotingClient.class);
     }
 }
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServer.java
index 6a5fb91..0d5ff38 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServer.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServer.java
@@ -51,6 +51,6 @@ public interface RemotingServer extends RemotingService {
         throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException,
         RemotingSendRequestException;
 
-    void init(NettyServerConfig nettyServerConfig, ChannelEventListener channelEventListener);
+    RemotingServer init(NettyServerConfig nettyServerConfig, ChannelEventListener channelEventListener);
 
 }
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServerFactory.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServerFactory.java
index e7a7700..125d4e0 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServerFactory.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServerFactory.java
@@ -4,7 +4,7 @@ import java.util.Map;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
-import org.apache.rocketmq.remoting.util.RemotingUtil;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
 import org.apache.rocketmq.remoting.util.ServiceProvider;
 
 public class RemotingServerFactory {
@@ -14,27 +14,21 @@ public class RemotingServerFactory {
     private RemotingServerFactory() {
     }
 
-    private static Map<String, RemotingServer> servers;
-
-//    private static Map<String/*protocolType*/, String/*path*/ >
+    private static Map<String, String> protocolPathMap;
 
     private static final String SERVER_LOCATION = "META-INF/service/org.apache.rocketmq.remoting.RemotingServer";
 
     static {
         log.info("begin load server");
-        servers = ServiceProvider.load(SERVER_LOCATION, RemotingClient.class);
-        log.info("end load server, size:{}", servers.size());
+        protocolPathMap = ServiceProvider.loadPath(SERVER_LOCATION);
+        log.info("end load server, size:{}", protocolPathMap.size());
     }
 
-    public static RemotingServer getRemotingServer() {
-        return getRemotingServer(RemotingUtil.DEFAULT_PROTOCOL);
-    }
 
-    public static RemotingServer getRemotingServer(String protocolType) {
-        return servers.get(protocolType);
+    public static RemotingServer createInstance(String protocol) {
+        return ServiceProvider.createInstance(protocolPathMap.get(protocol), RemotingClient.class);
+    }
+    public static RemotingServer createInstance() {
+        return ServiceProvider.createInstance(protocolPathMap.get(RemotingUtil.DEFAULT_PROTOCOL), RemotingServer.class);
     }
-
-//    public static RemotingServer createNewInstance(String protocolType){
-//        return ServiceProvider.load()
-//    }
 }
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java
index 3da3a18..88008ab 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java
@@ -41,6 +41,8 @@ public class RemotingUtil {
     private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
     private static boolean isLinuxPlatform = false;
     private static boolean isWindowsPlatform = false;
+    public static final String DEFAULT_PROTOCOL = "http2";
+    public static final String REMOTING_CHARSET = "UTF-8";
 
     static {
         if (OS_NAME != null && OS_NAME.toLowerCase().contains("linux")) {
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ClientImpl.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ClientImpl.java
index b294958..65504d1 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ClientImpl.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ClientImpl.java
@@ -64,7 +64,7 @@ public class Http2ClientImpl extends NettyRemotingClientAbstract implements Remo
     }
 
     @Override
-    public void init(NettyClientConfig clientConfig, ChannelEventListener channelEventListener) {
+    public RemotingClient init(NettyClientConfig clientConfig, ChannelEventListener channelEventListener) {
         this.nettyClientConfig = clientConfig;
         this.channelEventListener = channelEventListener;
         this.ioGroup = new NioEventLoopGroup(clientConfig.getClientWorkerThreads(), ThreadUtils.newGenericThreadFactory("NettyClientEpollIoThreads",
@@ -75,6 +75,7 @@ public class Http2ClientImpl extends NettyRemotingClientAbstract implements Remo
         this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(clientConfig.getClientWorkerThreads(),
             ThreadUtils.newGenericThreadFactory("NettyClientWorkerThreads", clientConfig.getClientWorkerThreads()));
         buildSslContext();
+        return this;
     }
 
     private void buildSslContext() {
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ServerImpl.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ServerImpl.java
index 02c4fb6..6ff3d90 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ServerImpl.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ServerImpl.java
@@ -74,7 +74,7 @@ public class Http2ServerImpl extends NettyRemotingServerAbstract implements Remo
     }
 
     @Override
-    public void init(NettyServerConfig nettyServerConfig, ChannelEventListener channelEventListener) {
+    public RemotingServer init(NettyServerConfig nettyServerConfig, ChannelEventListener channelEventListener) {
         super.init(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
         this.serverBootstrap = new ServerBootstrap();
         this.serverConfig = nettyServerConfig;
@@ -100,6 +100,7 @@ public class Http2ServerImpl extends NettyRemotingServerAbstract implements Remo
             ThreadUtils.newGenericThreadFactory("NettyWorkerThreads", serverConfig.getServerWorkerThreads()));
         this.port = nettyServerConfig.getListenPort();
         buildHttp2SslContext();
+        return this;
     }
 
     private void buildHttp2SslContext() {
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingClient.java
index 14666d2..4e691f1 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingClient.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingClient.java
@@ -86,7 +86,7 @@ public class NettyRemotingClient extends NettyRemotingClientAbstract implements
     }
 
     @Override
-    public void init(NettyClientConfig nettyClientConfig, ChannelEventListener channelEventListener) {
+    public RemotingClient init(NettyClientConfig nettyClientConfig, ChannelEventListener channelEventListener) {
         this.nettyClientConfig = nettyClientConfig;
         this.channelEventListener = channelEventListener;
         this.eventLoopGroupWorker = new NioEventLoopGroup(nettyClientConfig.getClientWorkerThreads(), ThreadUtils.newGenericThreadFactory("NettyClientEpollIoThreads",
@@ -107,6 +107,7 @@ public class NettyRemotingClient extends NettyRemotingClientAbstract implements
                 throw new RuntimeException("Failed to create SSLContext", e);
             }
         }
+        return this;
     }
 
     @Override
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingServer.java
index 9e7d419..d167b49 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingServer.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingServer.java
@@ -95,7 +95,7 @@ public class NettyRemotingServer extends NettyRemotingServerAbstract implements
     }
 
     @Override
-    public void init(NettyServerConfig serverConfig, ChannelEventListener channelEventListener) {
+    public RemotingServer init(NettyServerConfig serverConfig, ChannelEventListener channelEventListener) {
         this.nettyServerConfig = serverConfig;
         super.init(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
         this.serverBootstrap = new ServerBootstrap();
@@ -126,6 +126,7 @@ public class NettyRemotingServer extends NettyRemotingServerAbstract implements
         this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(serverConfig.getServerWorkerThreads(),
             ThreadUtils.newGenericThreadFactory("NettyWorkerThreads", serverConfig.getServerWorkerThreads()));
         loadSslContext();
+        return this;
     }
 
     public void loadSslContext() {
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/util/RemotingUtil.java b/remoting/src/main/java/org/apache/rocketmq/remoting/util/RemotingUtil.java
deleted file mode 100644
index ccd037f..0000000
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/util/RemotingUtil.java
+++ /dev/null
@@ -1,6 +0,0 @@
-package org.apache.rocketmq.remoting.util;
-
-public class RemotingUtil {
-    public static final String REMOTING_CHARSET = "UTF-8";
-    public static final String DEFAULT_PROTOCOL = "rocketmq";
-}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/util/ServiceProvider.java b/remoting/src/main/java/org/apache/rocketmq/remoting/util/ServiceProvider.java
index 24fa721..33c8312 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/util/ServiceProvider.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/util/ServiceProvider.java
@@ -15,6 +15,7 @@ package org.apache.rocketmq.remoting.util;
 import java.io.BufferedReader;
 import java.io.InputStream;
 import java.io.InputStreamReader;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.rocketmq.logging.InternalLogger;
@@ -91,11 +92,10 @@ public class ServiceProvider {
         }
     }
 
-    public static <T> Map<String, T> load(String path, Class<?> clazz) {
-        LOG.info("Looking for a resource file of name [{}] ...", path);
-        Map<String, T> services = new ConcurrentHashMap<String, T>();
+    public static Map<String, String> loadPath(String path) {
+        LOG.info("Load path looking for a resource file of name [{}] ...", path);
+        Map<String, String> pathMap = new HashMap<String, String>();
         try {
-
             final InputStream is = getResourceAsStream(getContextClassLoader(), path);
             if (is != null) {
                 BufferedReader reader;
@@ -106,29 +106,35 @@ public class ServiceProvider {
                 }
                 String serviceName = reader.readLine();
                 while (serviceName != null && !"".equals(serviceName)) {
-                    LOG.info(
-                        "Creating an instance as specified by file {} which was present in the path of the context classloader.",
-                        path);
                     String[] service = serviceName.split("=");
-                    if (service.length != 2) {
-                        continue;
-                    } else {
-                        if (services.containsKey(service[0])) {
+                    if (service.length == 2) {
+                        if (pathMap.containsKey(service[0])) {
                             continue;
                         } else {
-                            LOG.info("Begin to load protocol: " + service[0]);
-                            services.put(service[0], (T) initService(getContextClassLoader(), service[1], clazz));
+                            pathMap.put(service[0], service[1]);
                         }
+                    } else {
+                        continue;
                     }
                     serviceName = reader.readLine();
                 }
                 reader.close();
-            } else {
-                // is == null
-                LOG.warn("No resource file with name [{}] found.", path);
             }
-        } catch (Exception e) {
-            LOG.error("Error occured when looking for resource file " + path, e);
+        } catch (Exception ex) {
+            LOG.error("Error occured when looking for resource file " + path, ex);
+        }
+        return pathMap;
+    }
+
+    public static <T> Map<String, T> load(String path, Class<?> clazz) {
+        LOG.info("Load map is looking for a resource file of name [{}] ...", path);
+        Map<String, T> services = new HashMap<String, T>();
+        Map<String, String> pathMaps = loadPath(path);
+        for (Map.Entry<String, String> entry : pathMaps.entrySet()) {
+            T instance = (T) createInstance(entry.getValue(), clazz);
+            if (instance != null && !services.containsKey(entry.getKey())) {
+                services.put(entry.getKey(), instance);
+            }
         }
         return services;
     }
@@ -145,12 +151,7 @@ public class ServiceProvider {
                 }
                 String serviceName = reader.readLine();
                 reader.close();
-                if (serviceName != null && !"".equals(serviceName)) {
-                    return initService(getContextClassLoader(), serviceName, clazz);
-                } else {
-                    LOG.warn("ServiceName is empty!");
-                    return null;
-                }
+                return createInstance(serviceName, clazz);
             } catch (Exception e) {
                 LOG.warn("Error occurred when looking for resource file " + name, e);
             }
@@ -158,6 +159,15 @@ public class ServiceProvider {
         return null;
     }
 
+    public static <T> T createInstance(String serviceName, Class<?> clazz) {
+        if (serviceName != null && !"".equals(serviceName)) {
+            return initService(getContextClassLoader(), serviceName, clazz);
+        } else {
+            LOG.warn("ServiceName is empty!");
+            return null;
+        }
+    }
+
     protected static <T> T initService(ClassLoader classLoader, String serviceName, Class<?> clazz) {
         Class<?> serviceClazz = null;
         try {