You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2017/09/20 12:37:42 UTC

[3/3] incubator-rocketmq git commit: Polish thread poll create method

Polish thread poll create method


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/6593294f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/6593294f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/6593294f

Branch: refs/heads/rocketmq5
Commit: 6593294f07dec7039668e7f5807f530433c3bbda
Parents: 489b1d8
Author: yukon <yu...@apache.org>
Authored: Wed Sep 20 17:39:46 2017 +0800
Committer: yukon <yu...@apache.org>
Committed: Wed Sep 20 20:37:15 2017 +0800

----------------------------------------------------------------------
 remoting-core/pom.xml                           |  2 +-
 .../rocketmq/remoting/external/ThreadUtils.java | 26 +++++++++++++-------
 .../impl/netty/NettyRemotingAbstract.java       |  8 ++----
 .../rpc/impl/client/SimpleClientImpl.java       | 10 +++-----
 .../rpc/impl/server/SimpleServerImpl.java       | 10 +++-----
 .../rpc/impl/service/RpcInstanceAbstract.java   | 18 ++++----------
 .../rpc/impl/service/RpcProxyCommon.java        | 19 +++++---------
 7 files changed, 37 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/6593294f/remoting-core/pom.xml
----------------------------------------------------------------------
diff --git a/remoting-core/pom.xml b/remoting-core/pom.xml
index 702b826..997011b 100644
--- a/remoting-core/pom.xml
+++ b/remoting-core/pom.xml
@@ -77,7 +77,7 @@
             <dependency>
                 <groupId>io.netty</groupId>
                 <artifactId>netty-all</artifactId>
-                <version>4.1.6.Final</version>
+                <version>4.1.15.Final</version>
             </dependency>
             <dependency>
                 <groupId>com.alibaba</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/6593294f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/external/ThreadUtils.java
----------------------------------------------------------------------
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/external/ThreadUtils.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/external/ThreadUtils.java
index 1a80d20..5a50089 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/external/ThreadUtils.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/external/ThreadUtils.java
@@ -20,6 +20,7 @@ package org.apache.rocketmq.remoting.external;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -43,25 +44,32 @@ public final class ThreadUtils {
         int maximumPoolSize,
         long keepAliveTime,
         TimeUnit unit,
-        BlockingQueue<Runnable> workQueue, String processName, boolean isDaemon) {
-        return new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, newThreadFactory(processName, isDaemon));
+        BlockingQueue<Runnable> workQueue,
+        String processName, boolean isDaemon) {
+        return new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, newGenericThreadFactory(processName, isDaemon));
+    }
+
+    public static ExecutorService newFixedThreadPool(int nThreads, int workQueueCapacity, String processName, boolean isDaemon) {
+        return new ThreadPoolExecutor(
+            nThreads,
+            nThreads,
+            0,
+            TimeUnit.MILLISECONDS,
+            new LinkedBlockingQueue<Runnable>(workQueueCapacity),
+            newGenericThreadFactory(processName, isDaemon));
     }
 
     public static ExecutorService newSingleThreadExecutor(String processName, boolean isDaemon) {
-        return Executors.newSingleThreadExecutor(newThreadFactory(processName, isDaemon));
+        return Executors.newSingleThreadExecutor(newGenericThreadFactory(processName, isDaemon));
     }
 
     public static ScheduledExecutorService newSingleThreadScheduledExecutor(String processName, boolean isDaemon) {
-        return Executors.newSingleThreadScheduledExecutor(newThreadFactory(processName, isDaemon));
+        return Executors.newSingleThreadScheduledExecutor(newGenericThreadFactory(processName, isDaemon));
     }
 
     public static ScheduledExecutorService newFixedThreadScheduledPool(int nThreads, String processName,
         boolean isDaemon) {
-        return Executors.newScheduledThreadPool(nThreads, newThreadFactory(processName, isDaemon));
-    }
-
-    public static ThreadFactory newThreadFactory(String processName, boolean isDaemon) {
-        return newGenericThreadFactory("Remoting-" + processName, isDaemon);
+        return Executors.newScheduledThreadPool(nThreads, newGenericThreadFactory(processName, isDaemon));
     }
 
     public static ThreadFactory newGenericThreadFactory(String processName) {

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/6593294f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
----------------------------------------------------------------------
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
index a5c2118..82b17f4 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
@@ -92,13 +92,9 @@ public abstract class NettyRemotingAbstract implements RemotingService {
     NettyRemotingAbstract(RemotingConfig clientConfig, RemotingCommandFactoryMeta remotingCommandFactoryMeta) {
         this.semaphoreOneway = new Semaphore(clientConfig.getClientOnewayInvokeSemaphore(), true);
         this.semaphoreAsync = new Semaphore(clientConfig.getClientAsyncInvokeSemaphore(), true);
-        this.publicExecutor = ThreadUtils.newThreadPoolExecutor(
+        this.publicExecutor = ThreadUtils.newFixedThreadPool(
             clientConfig.getClientAsyncCallbackExecutorThreads(),
-            clientConfig.getClientAsyncCallbackExecutorThreads(),
-            60,
-            TimeUnit.SECONDS,
-            new LinkedBlockingQueue<Runnable>(10000),
-            "PublicExecutor", true);
+            10000, "Remoting-PublicExecutor", true);
         this.remotingCommandFactory = new RemotingCommandFactoryImpl(remotingCommandFactoryMeta);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/6593294f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/client/SimpleClientImpl.java
----------------------------------------------------------------------
diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/client/SimpleClientImpl.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/client/SimpleClientImpl.java
index 4483ca3..35931b2 100644
--- a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/client/SimpleClientImpl.java
+++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/client/SimpleClientImpl.java
@@ -19,7 +19,6 @@ package org.apache.rocketmq.rpc.impl.client;
 
 import java.util.Properties;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.remoting.api.RemotingClient;
 import org.apache.rocketmq.remoting.api.RemotingService;
@@ -48,13 +47,10 @@ public class SimpleClientImpl extends RpcInstanceAbstract implements SimpleClien
         super(rpcCommonConfig);
         this.remotingClient = remotingClient;
         this.rpcCommonConfig = rpcCommonConfig;
-        this.callServiceThreadPool = ThreadUtils.newThreadPoolExecutor(
+        this.callServiceThreadPool = ThreadUtils.newFixedThreadPool(
             rpcCommonConfig.getClientAsyncCallbackExecutorThreads(),
-            rpcCommonConfig.getClientAsyncCallbackExecutorThreads(),
-            rpcCommonConfig.getServiceThreadKeepAliveTime(),
-            TimeUnit.MILLISECONDS,
-            new LinkedBlockingQueue<Runnable>(rpcCommonConfig.getServiceThreadBlockQueueSize()),
-            "clientCallServiceThread", true);
+            rpcCommonConfig.getServiceThreadBlockQueueSize(),
+            "RPC-ClientCallServiceThread", true);
     }
 
     public void initialize() {

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/6593294f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/server/SimpleServerImpl.java
----------------------------------------------------------------------
diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/server/SimpleServerImpl.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/server/SimpleServerImpl.java
index 1fdda49..469e0c7 100644
--- a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/server/SimpleServerImpl.java
+++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/server/SimpleServerImpl.java
@@ -19,7 +19,6 @@ package org.apache.rocketmq.rpc.impl.server;
 
 import java.util.Properties;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.remoting.api.RemotingServer;
 import org.apache.rocketmq.remoting.api.RemotingService;
@@ -40,13 +39,10 @@ public class SimpleServerImpl extends RpcInstanceAbstract implements SimpleServe
     public SimpleServerImpl(final RpcCommonConfig remotingConfig) {
         this(remotingConfig, RemotingBootstrapFactory.createRemotingServer(remotingConfig));
         this.rpcCommonConfig = remotingConfig;
-        this.callServiceThreadPool = ThreadUtils.newThreadPoolExecutor(
+        this.callServiceThreadPool = ThreadUtils.newFixedThreadPool(
             rpcCommonConfig.getClientAsyncCallbackExecutorThreads(),
-            rpcCommonConfig.getClientAsyncCallbackExecutorThreads(),
-            rpcCommonConfig.getServiceThreadKeepAliveTime(),
-            TimeUnit.MILLISECONDS,
-            new LinkedBlockingQueue<Runnable>(remotingConfig.getServiceThreadBlockQueueSize()),
-            "serverCallServiceThread", true);
+            remotingConfig.getServiceThreadBlockQueueSize(),
+            "RPC-ServerCallServiceThread", true);
     }
 
     public SimpleServerImpl(final RpcCommonConfig remotingConfig, final RemotingServer remotingServer) {

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/6593294f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcInstanceAbstract.java
----------------------------------------------------------------------
diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcInstanceAbstract.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcInstanceAbstract.java
index 8c0ddf2..7ece4a8 100644
--- a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcInstanceAbstract.java
+++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcInstanceAbstract.java
@@ -18,8 +18,6 @@
 package org.apache.rocketmq.rpc.impl.service;
 
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.remoting.external.ThreadUtils;
 import org.apache.rocketmq.rpc.impl.command.RpcRequestCode;
@@ -29,15 +27,13 @@ import org.apache.rocketmq.rpc.impl.metrics.DefaultServiceAPIImpl;
 import org.apache.rocketmq.rpc.impl.metrics.ThreadStats;
 import org.apache.rocketmq.rpc.impl.processor.RpcRequestProcessor;
 
-import static org.apache.rocketmq.remoting.external.ThreadUtils.newThreadFactory;
-
 public abstract class RpcInstanceAbstract extends RpcProxyCommon {
     protected final RpcRequestProcessor rpcRequestProcessor;
     protected final ThreadLocal<RpcProviderContext> threadLocalProviderContext = new ThreadLocal<RpcProviderContext>();
     protected final RpcCommonConfig rpcCommonConfig;
     protected ThreadStats threadStats;
     private DefaultServiceAPIImpl defaultServiceAPI;
-    private ThreadPoolExecutor invokeServiceThreadPool;
+    private ExecutorService invokeServiceThreadPool;
 
     public RpcInstanceAbstract(RpcCommonConfig rpcCommonConfig) {
         super(rpcCommonConfig);
@@ -45,13 +41,9 @@ public abstract class RpcInstanceAbstract extends RpcProxyCommon {
         this.rpcCommonConfig = rpcCommonConfig;
         this.rpcRequestProcessor = new RpcRequestProcessor(this.threadLocalProviderContext, this, serviceStats);
 
-        this.invokeServiceThreadPool = new ThreadPoolExecutor(
-            rpcCommonConfig.getClientAsyncCallbackExecutorThreads(),
+        this.invokeServiceThreadPool = ThreadUtils.newFixedThreadPool(
             rpcCommonConfig.getClientAsyncCallbackExecutorThreads(),
-            60,
-            TimeUnit.SECONDS,
-            new LinkedBlockingQueue<Runnable>(rpcCommonConfig.getServiceThreadBlockQueueSize()),
-            newThreadFactory("rpcInvokeServiceThread", true));
+            rpcCommonConfig.getServiceThreadBlockQueueSize(),"RPC-InvokeServiceThread", true);
 
     }
 
@@ -81,11 +73,11 @@ public abstract class RpcInstanceAbstract extends RpcProxyCommon {
 
     public abstract void registerServiceListener();
 
-    public ThreadPoolExecutor getInvokeServiceThreadPool() {
+    public ExecutorService getInvokeServiceThreadPool() {
         return invokeServiceThreadPool;
     }
 
-    public void setInvokeServiceThreadPool(ThreadPoolExecutor invokeServiceThreadPool) {
+    public void setInvokeServiceThreadPool(ExecutorService invokeServiceThreadPool) {
         this.invokeServiceThreadPool = invokeServiceThreadPool;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/6593294f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcProxyCommon.java
----------------------------------------------------------------------
diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcProxyCommon.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcProxyCommon.java
index 2487f79..ac8c208 100644
--- a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcProxyCommon.java
+++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcProxyCommon.java
@@ -25,7 +25,6 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.remoting.api.AsyncHandler;
 import org.apache.rocketmq.remoting.api.RemotingService;
@@ -68,20 +67,14 @@ public abstract class RpcProxyCommon {
     public RpcProxyCommon(RpcCommonConfig rpcCommonConfig) {
         this.rpcCommonConfig = rpcCommonConfig;
         this.serviceStats = new ServiceStats();
-        this.promiseExecutorService = ThreadUtils.newThreadPoolExecutor(
+        this.promiseExecutorService = ThreadUtils.newFixedThreadPool(
             rpcCommonConfig.getClientAsyncCallbackExecutorThreads(),
+            rpcCommonConfig.getServiceThreadBlockQueueSize(),
+            "Remoting-PromiseExecutorService", true);
+        this.callServiceThreadPool = ThreadUtils.newFixedThreadPool(
             rpcCommonConfig.getClientAsyncCallbackExecutorThreads(),
-            rpcCommonConfig.getServiceThreadKeepAliveTime(),
-            TimeUnit.MILLISECONDS,
-            new LinkedBlockingQueue<Runnable>(rpcCommonConfig.getServiceThreadBlockQueueSize()),
-            "promiseExecutorService", true);
-        this.callServiceThreadPool = ThreadUtils.newThreadPoolExecutor(
-            rpcCommonConfig.getClientAsyncCallbackExecutorThreads(),
-            rpcCommonConfig.getClientAsyncCallbackExecutorThreads(),
-            rpcCommonConfig.getServiceThreadKeepAliveTime(),
-            TimeUnit.MILLISECONDS,
-            new LinkedBlockingQueue<Runnable>(rpcCommonConfig.getServiceThreadBlockQueueSize()),
-            "callServiceThread", true);
+            rpcCommonConfig.getServiceThreadBlockQueueSize(),
+            "Remoting-CallServiceThread", true);
     }
 
     private RemotingCommand createRemoteRequest(RemoteService serviceExport, Method method, Object[] args,