You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2017/09/20 12:37:41 UTC
[2/3] incubator-rocketmq git commit: Use LinkedBlockingQueue for
better performance
Use LinkedBlockingQueue for better performance
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/489b1d8b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/489b1d8b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/489b1d8b
Branch: refs/heads/rocketmq5
Commit: 489b1d8b7829990bd5b6ecef1ab5aeae685a1ec4
Parents: 114b6ae
Author: yukon <yu...@apache.org>
Authored: Wed Sep 20 17:18:43 2017 +0800
Committer: yukon <yu...@apache.org>
Committed: Wed Sep 20 17:18:43 2017 +0800
----------------------------------------------------------------------
.../impl/netty/NettyRemotingAbstract.java | 3 +--
.../rpc/impl/client/SimpleClientImpl.java | 12 +++++++----
.../rpc/impl/server/SimpleServerImpl.java | 11 ++++++----
.../rpc/impl/service/RpcInstanceAbstract.java | 12 +++++++----
.../rpc/impl/service/RpcProxyCommon.java | 22 +++++++++++++-------
5 files changed, 39 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/489b1d8b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
----------------------------------------------------------------------
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
index 4c22e7c..a5c2118 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
@@ -25,7 +25,6 @@ import io.netty.channel.SimpleChannelInboundHandler;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Map;
-import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
@@ -98,7 +97,7 @@ public abstract class NettyRemotingAbstract implements RemotingService {
clientConfig.getClientAsyncCallbackExecutorThreads(),
60,
TimeUnit.SECONDS,
- new ArrayBlockingQueue<Runnable>(10000),
+ new LinkedBlockingQueue<Runnable>(10000),
"PublicExecutor", true);
this.remotingCommandFactory = new RemotingCommandFactoryImpl(remotingCommandFactoryMeta);
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/489b1d8b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/client/SimpleClientImpl.java
----------------------------------------------------------------------
diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/client/SimpleClientImpl.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/client/SimpleClientImpl.java
index 787e8c1..4483ca3 100644
--- a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/client/SimpleClientImpl.java
+++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/client/SimpleClientImpl.java
@@ -18,8 +18,8 @@
package org.apache.rocketmq.rpc.impl.client;
import java.util.Properties;
-import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.remoting.api.RemotingClient;
import org.apache.rocketmq.remoting.api.RemotingService;
@@ -48,9 +48,13 @@ public class SimpleClientImpl extends RpcInstanceAbstract implements SimpleClien
super(rpcCommonConfig);
this.remotingClient = remotingClient;
this.rpcCommonConfig = rpcCommonConfig;
- this.callServiceThreadPool = ThreadUtils.newThreadPoolExecutor(rpcCommonConfig.getClientAsyncCallbackExecutorThreads(),
- rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), rpcCommonConfig.getServiceThreadKeepAliveTime(),
- TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(rpcCommonConfig.getServiceThreadBlockQueueSize()), "clientCallServiceThread", true);
+ this.callServiceThreadPool = ThreadUtils.newThreadPoolExecutor(
+ rpcCommonConfig.getClientAsyncCallbackExecutorThreads(),
+ rpcCommonConfig.getClientAsyncCallbackExecutorThreads(),
+ rpcCommonConfig.getServiceThreadKeepAliveTime(),
+ TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<Runnable>(rpcCommonConfig.getServiceThreadBlockQueueSize()),
+ "clientCallServiceThread", true);
}
public void initialize() {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/489b1d8b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/server/SimpleServerImpl.java
----------------------------------------------------------------------
diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/server/SimpleServerImpl.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/server/SimpleServerImpl.java
index e076cbe..1fdda49 100644
--- a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/server/SimpleServerImpl.java
+++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/server/SimpleServerImpl.java
@@ -18,8 +18,8 @@
package org.apache.rocketmq.rpc.impl.server;
import java.util.Properties;
-import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.remoting.api.RemotingServer;
import org.apache.rocketmq.remoting.api.RemotingService;
@@ -40,9 +40,12 @@ public class SimpleServerImpl extends RpcInstanceAbstract implements SimpleServe
public SimpleServerImpl(final RpcCommonConfig remotingConfig) {
this(remotingConfig, RemotingBootstrapFactory.createRemotingServer(remotingConfig));
this.rpcCommonConfig = remotingConfig;
- this.callServiceThreadPool = ThreadUtils.newThreadPoolExecutor(rpcCommonConfig.getClientAsyncCallbackExecutorThreads(),
- rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), rpcCommonConfig.getServiceThreadKeepAliveTime(),
- TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(remotingConfig.getServiceThreadBlockQueueSize()),
+ this.callServiceThreadPool = ThreadUtils.newThreadPoolExecutor(
+ rpcCommonConfig.getClientAsyncCallbackExecutorThreads(),
+ rpcCommonConfig.getClientAsyncCallbackExecutorThreads(),
+ rpcCommonConfig.getServiceThreadKeepAliveTime(),
+ TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<Runnable>(remotingConfig.getServiceThreadBlockQueueSize()),
"serverCallServiceThread", true);
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/489b1d8b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcInstanceAbstract.java
----------------------------------------------------------------------
diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcInstanceAbstract.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcInstanceAbstract.java
index 2b1288c..8c0ddf2 100644
--- a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcInstanceAbstract.java
+++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcInstanceAbstract.java
@@ -17,8 +17,8 @@
package org.apache.rocketmq.rpc.impl.service;
-import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.remoting.external.ThreadUtils;
@@ -45,9 +45,13 @@ public abstract class RpcInstanceAbstract extends RpcProxyCommon {
this.rpcCommonConfig = rpcCommonConfig;
this.rpcRequestProcessor = new RpcRequestProcessor(this.threadLocalProviderContext, this, serviceStats);
- this.invokeServiceThreadPool = new ThreadPoolExecutor(rpcCommonConfig.getClientAsyncCallbackExecutorThreads(),
- rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), 60, TimeUnit.SECONDS,
- new ArrayBlockingQueue<Runnable>(rpcCommonConfig.getServiceThreadBlockQueueSize()), newThreadFactory("rpcInvokeServiceThread", true));
+ this.invokeServiceThreadPool = new ThreadPoolExecutor(
+ rpcCommonConfig.getClientAsyncCallbackExecutorThreads(),
+ rpcCommonConfig.getClientAsyncCallbackExecutorThreads(),
+ 60,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>(rpcCommonConfig.getServiceThreadBlockQueueSize()),
+ newThreadFactory("rpcInvokeServiceThread", true));
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/489b1d8b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcProxyCommon.java
----------------------------------------------------------------------
diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcProxyCommon.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcProxyCommon.java
index c5d9a3c..2487f79 100644
--- a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcProxyCommon.java
+++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcProxyCommon.java
@@ -21,11 +21,11 @@ import java.lang.reflect.Method;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.nio.ByteBuffer;
-import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.remoting.api.AsyncHandler;
import org.apache.rocketmq.remoting.api.RemotingService;
@@ -68,12 +68,20 @@ public abstract class RpcProxyCommon {
public RpcProxyCommon(RpcCommonConfig rpcCommonConfig) {
this.rpcCommonConfig = rpcCommonConfig;
this.serviceStats = new ServiceStats();
- this.promiseExecutorService = ThreadUtils.newThreadPoolExecutor(rpcCommonConfig.getClientAsyncCallbackExecutorThreads(),
- rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), rpcCommonConfig.getServiceThreadKeepAliveTime(),
- TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(rpcCommonConfig.getServiceThreadBlockQueueSize()), "promiseExecutorService", true);
- this.callServiceThreadPool = ThreadUtils.newThreadPoolExecutor(rpcCommonConfig.getClientAsyncCallbackExecutorThreads(),
- rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), rpcCommonConfig.getServiceThreadKeepAliveTime(),
- TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(rpcCommonConfig.getServiceThreadBlockQueueSize()), "callServiceThread", true);
+ this.promiseExecutorService = ThreadUtils.newThreadPoolExecutor(
+ rpcCommonConfig.getClientAsyncCallbackExecutorThreads(),
+ rpcCommonConfig.getClientAsyncCallbackExecutorThreads(),
+ rpcCommonConfig.getServiceThreadKeepAliveTime(),
+ TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<Runnable>(rpcCommonConfig.getServiceThreadBlockQueueSize()),
+ "promiseExecutorService", true);
+ this.callServiceThreadPool = ThreadUtils.newThreadPoolExecutor(
+ rpcCommonConfig.getClientAsyncCallbackExecutorThreads(),
+ rpcCommonConfig.getClientAsyncCallbackExecutorThreads(),
+ rpcCommonConfig.getServiceThreadKeepAliveTime(),
+ TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<Runnable>(rpcCommonConfig.getServiceThreadBlockQueueSize()),
+ "callServiceThread", true);
}
private RemotingCommand createRemoteRequest(RemoteService serviceExport, Method method, Object[] args,