You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2017/09/20 12:37:41 UTC

[2/3] incubator-rocketmq git commit: Use LinkedBlockingQueue for better performance

Use LinkedBlockingQueue for better performance


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/489b1d8b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/489b1d8b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/489b1d8b

Branch: refs/heads/rocketmq5
Commit: 489b1d8b7829990bd5b6ecef1ab5aeae685a1ec4
Parents: 114b6ae
Author: yukon <yu...@apache.org>
Authored: Wed Sep 20 17:18:43 2017 +0800
Committer: yukon <yu...@apache.org>
Committed: Wed Sep 20 17:18:43 2017 +0800

----------------------------------------------------------------------
 .../impl/netty/NettyRemotingAbstract.java       |  3 +--
 .../rpc/impl/client/SimpleClientImpl.java       | 12 +++++++----
 .../rpc/impl/server/SimpleServerImpl.java       | 11 ++++++----
 .../rpc/impl/service/RpcInstanceAbstract.java   | 12 +++++++----
 .../rpc/impl/service/RpcProxyCommon.java        | 22 +++++++++++++-------
 5 files changed, 39 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/489b1d8b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
----------------------------------------------------------------------
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
index 4c22e7c..a5c2118 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
@@ -25,7 +25,6 @@ import io.netty.channel.SimpleChannelInboundHandler;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.util.Map;
-import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -98,7 +97,7 @@ public abstract class NettyRemotingAbstract implements RemotingService {
             clientConfig.getClientAsyncCallbackExecutorThreads(),
             60,
             TimeUnit.SECONDS,
-            new ArrayBlockingQueue<Runnable>(10000),
+            new LinkedBlockingQueue<Runnable>(10000),
             "PublicExecutor", true);
         this.remotingCommandFactory = new RemotingCommandFactoryImpl(remotingCommandFactoryMeta);
     }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/489b1d8b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/client/SimpleClientImpl.java
----------------------------------------------------------------------
diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/client/SimpleClientImpl.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/client/SimpleClientImpl.java
index 787e8c1..4483ca3 100644
--- a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/client/SimpleClientImpl.java
+++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/client/SimpleClientImpl.java
@@ -18,8 +18,8 @@
 package org.apache.rocketmq.rpc.impl.client;
 
 import java.util.Properties;
-import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.remoting.api.RemotingClient;
 import org.apache.rocketmq.remoting.api.RemotingService;
@@ -48,9 +48,13 @@ public class SimpleClientImpl extends RpcInstanceAbstract implements SimpleClien
         super(rpcCommonConfig);
         this.remotingClient = remotingClient;
         this.rpcCommonConfig = rpcCommonConfig;
-        this.callServiceThreadPool = ThreadUtils.newThreadPoolExecutor(rpcCommonConfig.getClientAsyncCallbackExecutorThreads(),
-            rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), rpcCommonConfig.getServiceThreadKeepAliveTime(),
-            TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(rpcCommonConfig.getServiceThreadBlockQueueSize()), "clientCallServiceThread", true);
+        this.callServiceThreadPool = ThreadUtils.newThreadPoolExecutor(
+            rpcCommonConfig.getClientAsyncCallbackExecutorThreads(),
+            rpcCommonConfig.getClientAsyncCallbackExecutorThreads(),
+            rpcCommonConfig.getServiceThreadKeepAliveTime(),
+            TimeUnit.MILLISECONDS,
+            new LinkedBlockingQueue<Runnable>(rpcCommonConfig.getServiceThreadBlockQueueSize()),
+            "clientCallServiceThread", true);
     }
 
     public void initialize() {

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/489b1d8b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/server/SimpleServerImpl.java
----------------------------------------------------------------------
diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/server/SimpleServerImpl.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/server/SimpleServerImpl.java
index e076cbe..1fdda49 100644
--- a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/server/SimpleServerImpl.java
+++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/server/SimpleServerImpl.java
@@ -18,8 +18,8 @@
 package org.apache.rocketmq.rpc.impl.server;
 
 import java.util.Properties;
-import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.remoting.api.RemotingServer;
 import org.apache.rocketmq.remoting.api.RemotingService;
@@ -40,9 +40,12 @@ public class SimpleServerImpl extends RpcInstanceAbstract implements SimpleServe
     public SimpleServerImpl(final RpcCommonConfig remotingConfig) {
         this(remotingConfig, RemotingBootstrapFactory.createRemotingServer(remotingConfig));
         this.rpcCommonConfig = remotingConfig;
-        this.callServiceThreadPool = ThreadUtils.newThreadPoolExecutor(rpcCommonConfig.getClientAsyncCallbackExecutorThreads(),
-            rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), rpcCommonConfig.getServiceThreadKeepAliveTime(),
-            TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(remotingConfig.getServiceThreadBlockQueueSize()),
+        this.callServiceThreadPool = ThreadUtils.newThreadPoolExecutor(
+            rpcCommonConfig.getClientAsyncCallbackExecutorThreads(),
+            rpcCommonConfig.getClientAsyncCallbackExecutorThreads(),
+            rpcCommonConfig.getServiceThreadKeepAliveTime(),
+            TimeUnit.MILLISECONDS,
+            new LinkedBlockingQueue<Runnable>(remotingConfig.getServiceThreadBlockQueueSize()),
             "serverCallServiceThread", true);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/489b1d8b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcInstanceAbstract.java
----------------------------------------------------------------------
diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcInstanceAbstract.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcInstanceAbstract.java
index 2b1288c..8c0ddf2 100644
--- a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcInstanceAbstract.java
+++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcInstanceAbstract.java
@@ -17,8 +17,8 @@
 
 package org.apache.rocketmq.rpc.impl.service;
 
-import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.remoting.external.ThreadUtils;
@@ -45,9 +45,13 @@ public abstract class RpcInstanceAbstract extends RpcProxyCommon {
         this.rpcCommonConfig = rpcCommonConfig;
         this.rpcRequestProcessor = new RpcRequestProcessor(this.threadLocalProviderContext, this, serviceStats);
 
-        this.invokeServiceThreadPool = new ThreadPoolExecutor(rpcCommonConfig.getClientAsyncCallbackExecutorThreads(),
-            rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), 60, TimeUnit.SECONDS,
-            new ArrayBlockingQueue<Runnable>(rpcCommonConfig.getServiceThreadBlockQueueSize()), newThreadFactory("rpcInvokeServiceThread", true));
+        this.invokeServiceThreadPool = new ThreadPoolExecutor(
+            rpcCommonConfig.getClientAsyncCallbackExecutorThreads(),
+            rpcCommonConfig.getClientAsyncCallbackExecutorThreads(),
+            60,
+            TimeUnit.SECONDS,
+            new LinkedBlockingQueue<Runnable>(rpcCommonConfig.getServiceThreadBlockQueueSize()),
+            newThreadFactory("rpcInvokeServiceThread", true));
 
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/489b1d8b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcProxyCommon.java
----------------------------------------------------------------------
diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcProxyCommon.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcProxyCommon.java
index c5d9a3c..2487f79 100644
--- a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcProxyCommon.java
+++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcProxyCommon.java
@@ -21,11 +21,11 @@ import java.lang.reflect.Method;
 import java.lang.reflect.ParameterizedType;
 import java.lang.reflect.Type;
 import java.nio.ByteBuffer;
-import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.remoting.api.AsyncHandler;
 import org.apache.rocketmq.remoting.api.RemotingService;
@@ -68,12 +68,20 @@ public abstract class RpcProxyCommon {
     public RpcProxyCommon(RpcCommonConfig rpcCommonConfig) {
         this.rpcCommonConfig = rpcCommonConfig;
         this.serviceStats = new ServiceStats();
-        this.promiseExecutorService = ThreadUtils.newThreadPoolExecutor(rpcCommonConfig.getClientAsyncCallbackExecutorThreads(),
-            rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), rpcCommonConfig.getServiceThreadKeepAliveTime(),
-            TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(rpcCommonConfig.getServiceThreadBlockQueueSize()), "promiseExecutorService", true);
-        this.callServiceThreadPool = ThreadUtils.newThreadPoolExecutor(rpcCommonConfig.getClientAsyncCallbackExecutorThreads(),
-            rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), rpcCommonConfig.getServiceThreadKeepAliveTime(),
-            TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(rpcCommonConfig.getServiceThreadBlockQueueSize()), "callServiceThread", true);
+        this.promiseExecutorService = ThreadUtils.newThreadPoolExecutor(
+            rpcCommonConfig.getClientAsyncCallbackExecutorThreads(),
+            rpcCommonConfig.getClientAsyncCallbackExecutorThreads(),
+            rpcCommonConfig.getServiceThreadKeepAliveTime(),
+            TimeUnit.MILLISECONDS,
+            new LinkedBlockingQueue<Runnable>(rpcCommonConfig.getServiceThreadBlockQueueSize()),
+            "promiseExecutorService", true);
+        this.callServiceThreadPool = ThreadUtils.newThreadPoolExecutor(
+            rpcCommonConfig.getClientAsyncCallbackExecutorThreads(),
+            rpcCommonConfig.getClientAsyncCallbackExecutorThreads(),
+            rpcCommonConfig.getServiceThreadKeepAliveTime(),
+            TimeUnit.MILLISECONDS,
+            new LinkedBlockingQueue<Runnable>(rpcCommonConfig.getServiceThreadBlockQueueSize()),
+            "callServiceThread", true);
     }
 
     private RemotingCommand createRemoteRequest(RemoteService serviceExport, Method method, Object[] args,