You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2017/09/20 12:37:42 UTC
[3/3] incubator-rocketmq git commit: Polish thread poll create method
Polish thread poll create method
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/6593294f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/6593294f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/6593294f
Branch: refs/heads/rocketmq5
Commit: 6593294f07dec7039668e7f5807f530433c3bbda
Parents: 489b1d8
Author: yukon <yu...@apache.org>
Authored: Wed Sep 20 17:39:46 2017 +0800
Committer: yukon <yu...@apache.org>
Committed: Wed Sep 20 20:37:15 2017 +0800
----------------------------------------------------------------------
remoting-core/pom.xml | 2 +-
.../rocketmq/remoting/external/ThreadUtils.java | 26 +++++++++++++-------
.../impl/netty/NettyRemotingAbstract.java | 8 ++----
.../rpc/impl/client/SimpleClientImpl.java | 10 +++-----
.../rpc/impl/server/SimpleServerImpl.java | 10 +++-----
.../rpc/impl/service/RpcInstanceAbstract.java | 18 ++++----------
.../rpc/impl/service/RpcProxyCommon.java | 19 +++++---------
7 files changed, 37 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/6593294f/remoting-core/pom.xml
----------------------------------------------------------------------
diff --git a/remoting-core/pom.xml b/remoting-core/pom.xml
index 702b826..997011b 100644
--- a/remoting-core/pom.xml
+++ b/remoting-core/pom.xml
@@ -77,7 +77,7 @@
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
- <version>4.1.6.Final</version>
+ <version>4.1.15.Final</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/6593294f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/external/ThreadUtils.java
----------------------------------------------------------------------
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/external/ThreadUtils.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/external/ThreadUtils.java
index 1a80d20..5a50089 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/external/ThreadUtils.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/external/ThreadUtils.java
@@ -20,6 +20,7 @@ package org.apache.rocketmq.remoting.external;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
@@ -43,25 +44,32 @@ public final class ThreadUtils {
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
- BlockingQueue<Runnable> workQueue, String processName, boolean isDaemon) {
- return new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, newThreadFactory(processName, isDaemon));
+ BlockingQueue<Runnable> workQueue,
+ String processName, boolean isDaemon) {
+ return new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, newGenericThreadFactory(processName, isDaemon));
+ }
+
+ public static ExecutorService newFixedThreadPool(int nThreads, int workQueueCapacity, String processName, boolean isDaemon) {
+ return new ThreadPoolExecutor(
+ nThreads,
+ nThreads,
+ 0,
+ TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<Runnable>(workQueueCapacity),
+ newGenericThreadFactory(processName, isDaemon));
}
public static ExecutorService newSingleThreadExecutor(String processName, boolean isDaemon) {
- return Executors.newSingleThreadExecutor(newThreadFactory(processName, isDaemon));
+ return Executors.newSingleThreadExecutor(newGenericThreadFactory(processName, isDaemon));
}
public static ScheduledExecutorService newSingleThreadScheduledExecutor(String processName, boolean isDaemon) {
- return Executors.newSingleThreadScheduledExecutor(newThreadFactory(processName, isDaemon));
+ return Executors.newSingleThreadScheduledExecutor(newGenericThreadFactory(processName, isDaemon));
}
public static ScheduledExecutorService newFixedThreadScheduledPool(int nThreads, String processName,
boolean isDaemon) {
- return Executors.newScheduledThreadPool(nThreads, newThreadFactory(processName, isDaemon));
- }
-
- public static ThreadFactory newThreadFactory(String processName, boolean isDaemon) {
- return newGenericThreadFactory("Remoting-" + processName, isDaemon);
+ return Executors.newScheduledThreadPool(nThreads, newGenericThreadFactory(processName, isDaemon));
}
public static ThreadFactory newGenericThreadFactory(String processName) {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/6593294f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
----------------------------------------------------------------------
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
index a5c2118..82b17f4 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
@@ -92,13 +92,9 @@ public abstract class NettyRemotingAbstract implements RemotingService {
NettyRemotingAbstract(RemotingConfig clientConfig, RemotingCommandFactoryMeta remotingCommandFactoryMeta) {
this.semaphoreOneway = new Semaphore(clientConfig.getClientOnewayInvokeSemaphore(), true);
this.semaphoreAsync = new Semaphore(clientConfig.getClientAsyncInvokeSemaphore(), true);
- this.publicExecutor = ThreadUtils.newThreadPoolExecutor(
+ this.publicExecutor = ThreadUtils.newFixedThreadPool(
clientConfig.getClientAsyncCallbackExecutorThreads(),
- clientConfig.getClientAsyncCallbackExecutorThreads(),
- 60,
- TimeUnit.SECONDS,
- new LinkedBlockingQueue<Runnable>(10000),
- "PublicExecutor", true);
+ 10000, "Remoting-PublicExecutor", true);
this.remotingCommandFactory = new RemotingCommandFactoryImpl(remotingCommandFactoryMeta);
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/6593294f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/client/SimpleClientImpl.java
----------------------------------------------------------------------
diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/client/SimpleClientImpl.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/client/SimpleClientImpl.java
index 4483ca3..35931b2 100644
--- a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/client/SimpleClientImpl.java
+++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/client/SimpleClientImpl.java
@@ -19,7 +19,6 @@ package org.apache.rocketmq.rpc.impl.client;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.remoting.api.RemotingClient;
import org.apache.rocketmq.remoting.api.RemotingService;
@@ -48,13 +47,10 @@ public class SimpleClientImpl extends RpcInstanceAbstract implements SimpleClien
super(rpcCommonConfig);
this.remotingClient = remotingClient;
this.rpcCommonConfig = rpcCommonConfig;
- this.callServiceThreadPool = ThreadUtils.newThreadPoolExecutor(
+ this.callServiceThreadPool = ThreadUtils.newFixedThreadPool(
rpcCommonConfig.getClientAsyncCallbackExecutorThreads(),
- rpcCommonConfig.getClientAsyncCallbackExecutorThreads(),
- rpcCommonConfig.getServiceThreadKeepAliveTime(),
- TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue<Runnable>(rpcCommonConfig.getServiceThreadBlockQueueSize()),
- "clientCallServiceThread", true);
+ rpcCommonConfig.getServiceThreadBlockQueueSize(),
+ "RPC-ClientCallServiceThread", true);
}
public void initialize() {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/6593294f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/server/SimpleServerImpl.java
----------------------------------------------------------------------
diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/server/SimpleServerImpl.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/server/SimpleServerImpl.java
index 1fdda49..469e0c7 100644
--- a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/server/SimpleServerImpl.java
+++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/server/SimpleServerImpl.java
@@ -19,7 +19,6 @@ package org.apache.rocketmq.rpc.impl.server;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.remoting.api.RemotingServer;
import org.apache.rocketmq.remoting.api.RemotingService;
@@ -40,13 +39,10 @@ public class SimpleServerImpl extends RpcInstanceAbstract implements SimpleServe
public SimpleServerImpl(final RpcCommonConfig remotingConfig) {
this(remotingConfig, RemotingBootstrapFactory.createRemotingServer(remotingConfig));
this.rpcCommonConfig = remotingConfig;
- this.callServiceThreadPool = ThreadUtils.newThreadPoolExecutor(
+ this.callServiceThreadPool = ThreadUtils.newFixedThreadPool(
rpcCommonConfig.getClientAsyncCallbackExecutorThreads(),
- rpcCommonConfig.getClientAsyncCallbackExecutorThreads(),
- rpcCommonConfig.getServiceThreadKeepAliveTime(),
- TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue<Runnable>(remotingConfig.getServiceThreadBlockQueueSize()),
- "serverCallServiceThread", true);
+ remotingConfig.getServiceThreadBlockQueueSize(),
+ "RPC-ServerCallServiceThread", true);
}
public SimpleServerImpl(final RpcCommonConfig remotingConfig, final RemotingServer remotingServer) {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/6593294f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcInstanceAbstract.java
----------------------------------------------------------------------
diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcInstanceAbstract.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcInstanceAbstract.java
index 8c0ddf2..7ece4a8 100644
--- a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcInstanceAbstract.java
+++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcInstanceAbstract.java
@@ -18,8 +18,6 @@
package org.apache.rocketmq.rpc.impl.service;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.remoting.external.ThreadUtils;
import org.apache.rocketmq.rpc.impl.command.RpcRequestCode;
@@ -29,15 +27,13 @@ import org.apache.rocketmq.rpc.impl.metrics.DefaultServiceAPIImpl;
import org.apache.rocketmq.rpc.impl.metrics.ThreadStats;
import org.apache.rocketmq.rpc.impl.processor.RpcRequestProcessor;
-import static org.apache.rocketmq.remoting.external.ThreadUtils.newThreadFactory;
-
public abstract class RpcInstanceAbstract extends RpcProxyCommon {
protected final RpcRequestProcessor rpcRequestProcessor;
protected final ThreadLocal<RpcProviderContext> threadLocalProviderContext = new ThreadLocal<RpcProviderContext>();
protected final RpcCommonConfig rpcCommonConfig;
protected ThreadStats threadStats;
private DefaultServiceAPIImpl defaultServiceAPI;
- private ThreadPoolExecutor invokeServiceThreadPool;
+ private ExecutorService invokeServiceThreadPool;
public RpcInstanceAbstract(RpcCommonConfig rpcCommonConfig) {
super(rpcCommonConfig);
@@ -45,13 +41,9 @@ public abstract class RpcInstanceAbstract extends RpcProxyCommon {
this.rpcCommonConfig = rpcCommonConfig;
this.rpcRequestProcessor = new RpcRequestProcessor(this.threadLocalProviderContext, this, serviceStats);
- this.invokeServiceThreadPool = new ThreadPoolExecutor(
- rpcCommonConfig.getClientAsyncCallbackExecutorThreads(),
+ this.invokeServiceThreadPool = ThreadUtils.newFixedThreadPool(
rpcCommonConfig.getClientAsyncCallbackExecutorThreads(),
- 60,
- TimeUnit.SECONDS,
- new LinkedBlockingQueue<Runnable>(rpcCommonConfig.getServiceThreadBlockQueueSize()),
- newThreadFactory("rpcInvokeServiceThread", true));
+ rpcCommonConfig.getServiceThreadBlockQueueSize(),"RPC-InvokeServiceThread", true);
}
@@ -81,11 +73,11 @@ public abstract class RpcInstanceAbstract extends RpcProxyCommon {
public abstract void registerServiceListener();
- public ThreadPoolExecutor getInvokeServiceThreadPool() {
+ public ExecutorService getInvokeServiceThreadPool() {
return invokeServiceThreadPool;
}
- public void setInvokeServiceThreadPool(ThreadPoolExecutor invokeServiceThreadPool) {
+ public void setInvokeServiceThreadPool(ExecutorService invokeServiceThreadPool) {
this.invokeServiceThreadPool = invokeServiceThreadPool;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/6593294f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcProxyCommon.java
----------------------------------------------------------------------
diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcProxyCommon.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcProxyCommon.java
index 2487f79..ac8c208 100644
--- a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcProxyCommon.java
+++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcProxyCommon.java
@@ -25,7 +25,6 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.remoting.api.AsyncHandler;
import org.apache.rocketmq.remoting.api.RemotingService;
@@ -68,20 +67,14 @@ public abstract class RpcProxyCommon {
public RpcProxyCommon(RpcCommonConfig rpcCommonConfig) {
this.rpcCommonConfig = rpcCommonConfig;
this.serviceStats = new ServiceStats();
- this.promiseExecutorService = ThreadUtils.newThreadPoolExecutor(
+ this.promiseExecutorService = ThreadUtils.newFixedThreadPool(
rpcCommonConfig.getClientAsyncCallbackExecutorThreads(),
+ rpcCommonConfig.getServiceThreadBlockQueueSize(),
+ "Remoting-PromiseExecutorService", true);
+ this.callServiceThreadPool = ThreadUtils.newFixedThreadPool(
rpcCommonConfig.getClientAsyncCallbackExecutorThreads(),
- rpcCommonConfig.getServiceThreadKeepAliveTime(),
- TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue<Runnable>(rpcCommonConfig.getServiceThreadBlockQueueSize()),
- "promiseExecutorService", true);
- this.callServiceThreadPool = ThreadUtils.newThreadPoolExecutor(
- rpcCommonConfig.getClientAsyncCallbackExecutorThreads(),
- rpcCommonConfig.getClientAsyncCallbackExecutorThreads(),
- rpcCommonConfig.getServiceThreadKeepAliveTime(),
- TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue<Runnable>(rpcCommonConfig.getServiceThreadBlockQueueSize()),
- "callServiceThread", true);
+ rpcCommonConfig.getServiceThreadBlockQueueSize(),
+ "Remoting-CallServiceThread", true);
}
private RemotingCommand createRemoteRequest(RemoteService serviceExport, Method method, Object[] args,