You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2017/12/01 12:37:26 UTC

[GitHub] zhouxinyu closed pull request #192: [ROCKETMQ-312] Use independent thread pool for QueryMessageProcessor

zhouxinyu closed pull request #192: [ROCKETMQ-312] Use independent thread pool for QueryMessageProcessor
URL: https://github.com/apache/rocketmq/pull/192
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index cd68552b..0a6f0b45 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -114,6 +114,7 @@
     private final SlaveSynchronize slaveSynchronize;
     private final BlockingQueue<Runnable> sendThreadPoolQueue;
     private final BlockingQueue<Runnable> pullThreadPoolQueue;
+    private final BlockingQueue<Runnable> queryThreadPoolQueue;
     private final BlockingQueue<Runnable> clientManagerThreadPoolQueue;
     private final BlockingQueue<Runnable> consumerManagerThreadPoolQueue;
     private final FilterServerManager filterServerManager;
@@ -126,6 +127,7 @@
     private TopicConfigManager topicConfigManager;
     private ExecutorService sendMessageExecutor;
     private ExecutorService pullMessageExecutor;
+    private ExecutorService queryMessageExecutor;
     private ExecutorService adminBrokerExecutor;
     private ExecutorService clientManageExecutor;
     private ExecutorService consumerManageExecutor;
@@ -163,8 +165,8 @@ public BrokerController(
         this.slaveSynchronize = new SlaveSynchronize(this);
 
         this.sendThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getSendThreadPoolQueueCapacity());
-
         this.pullThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getPullThreadPoolQueueCapacity());
+        this.queryThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getQueryThreadPoolQueueCapacity());
         this.clientManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getClientManagerThreadPoolQueueCapacity());
         this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity());
 
@@ -191,6 +193,10 @@ public NettyServerConfig getNettyServerConfig() {
         return pullThreadPoolQueue;
     }
 
+    public BlockingQueue<Runnable> getQueryThreadPoolQueue() {
+        return queryThreadPoolQueue;
+    }
+
     public boolean initialize() throws CloneNotSupportedException {
         boolean result = this.topicConfigManager.load();
 
@@ -237,6 +243,14 @@ public boolean initialize() throws CloneNotSupportedException {
                 this.pullThreadPoolQueue,
                 new ThreadFactoryImpl("PullMessageThread_"));
 
+            this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor(
+                this.brokerConfig.getQueryMessageThreadPoolNums(),
+                this.brokerConfig.getQueryMessageThreadPoolNums(),
+                1000 * 60,
+                TimeUnit.MILLISECONDS,
+                this.queryThreadPoolQueue,
+                new ThreadFactoryImpl("QueryMessageThread_"));
+
             this.adminBrokerExecutor =
                 Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl(
                     "AdminBrokerThread_"));
@@ -404,11 +418,11 @@ public void registerProcessor() {
          * QueryMessageProcessor
          */
         NettyRequestProcessor queryProcessor = new QueryMessageProcessor(this);
-        this.remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.pullMessageExecutor);
-        this.remotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.pullMessageExecutor);
+        this.remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);
+        this.remotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);
 
-        this.fastRemotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.pullMessageExecutor);
-        this.fastRemotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.pullMessageExecutor);
+        this.fastRemotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);
+        this.fastRemotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);
 
         /**
          * ClientManageProcessor
@@ -494,9 +508,14 @@ public long headSlowTimeMills4PullThreadPoolQueue() {
         return this.headSlowTimeMills(this.pullThreadPoolQueue);
     }
 
+    public long headSlowTimeMills4QueryThreadPoolQueue() {
+        return this.headSlowTimeMills(this.queryThreadPoolQueue);
+    }
+
     public void printWaterMark() {
         LOG_WATER_MARK.info("[WATERMARK] Send Queue Size: {} SlowTimeMills: {}", this.sendThreadPoolQueue.size(), headSlowTimeMills4SendThreadPoolQueue());
         LOG_WATER_MARK.info("[WATERMARK] Pull Queue Size: {} SlowTimeMills: {}", this.pullThreadPoolQueue.size(), headSlowTimeMills4PullThreadPoolQueue());
+        LOG_WATER_MARK.info("[WATERMARK] Query Queue Size: {} SlowTimeMills: {}", this.queryThreadPoolQueue.size(), headSlowTimeMills4QueryThreadPoolQueue());
     }
 
     public MessageStore getMessageStore() {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index abea4ec9..d69a7870 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -1205,11 +1205,17 @@ private RemotingCommand fetchAllConsumeStatsInBroker(ChannelHandlerContext ctx,
         runtimeInfo.put("pullThreadPoolQueueCapacity",
             String.valueOf(this.brokerController.getBrokerConfig().getPullThreadPoolQueueCapacity()));
 
+        runtimeInfo.put("queryThreadPoolQueueSize", String.valueOf(this.brokerController.getQueryThreadPoolQueue().size()));
+        runtimeInfo.put("queryThreadPoolQueueCapacity",
+            String.valueOf(this.brokerController.getBrokerConfig().getQueryThreadPoolQueueCapacity()));
+
         runtimeInfo.put("dispatchBehindBytes", String.valueOf(this.brokerController.getMessageStore().dispatchBehindBytes()));
         runtimeInfo.put("pageCacheLockTimeMills", String.valueOf(this.brokerController.getMessageStore().lockTimeMills()));
 
         runtimeInfo.put("sendThreadPoolQueueHeadWaitTimeMills", String.valueOf(this.brokerController.headSlowTimeMills4SendThreadPoolQueue()));
         runtimeInfo.put("pullThreadPoolQueueHeadWaitTimeMills", String.valueOf(this.brokerController.headSlowTimeMills4PullThreadPoolQueue()));
+        runtimeInfo.put("queryThreadPoolQueueHeadWaitTimeMills", String.valueOf(this.brokerController.headSlowTimeMills4QueryThreadPoolQueue()));
+
         runtimeInfo.put("earliestMessageTimeStamp", String.valueOf(this.brokerController.getMessageStore().getEarliestMessageTime()));
         runtimeInfo.put("startAcceptSendRequestTimeStamp", String.valueOf(this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp()));
         if (this.brokerController.getMessageStore() instanceof DefaultMessageStore) {
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 9a208a3f..c344a7ce 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -59,6 +59,8 @@
      */
     private int sendMessageThreadPoolNums = 1; //16 + Runtime.getRuntime().availableProcessors() * 4;
     private int pullMessageThreadPoolNums = 16 + Runtime.getRuntime().availableProcessors() * 2;
+    private int queryMessageThreadPoolNums = 8 + Runtime.getRuntime().availableProcessors();
+
     private int adminBrokerThreadPoolNums = 16;
     private int clientManageThreadPoolNums = 32;
     private int consumerManageThreadPoolNums = 32;
@@ -73,6 +75,7 @@
     private boolean fetchNamesrvAddrByAddressServer = false;
     private int sendThreadPoolQueueCapacity = 10000;
     private int pullThreadPoolQueueCapacity = 100000;
+    private int queryThreadPoolQueueCapacity = 20000;
     private int clientManagerThreadPoolQueueCapacity = 1000000;
     private int consumerManagerThreadPoolQueueCapacity = 1000000;
 
@@ -306,6 +309,14 @@ public void setPullMessageThreadPoolNums(int pullMessageThreadPoolNums) {
         this.pullMessageThreadPoolNums = pullMessageThreadPoolNums;
     }
 
+    public int getQueryMessageThreadPoolNums() {
+        return queryMessageThreadPoolNums;
+    }
+
+    public void setQueryMessageThreadPoolNums(final int queryMessageThreadPoolNums) {
+        this.queryMessageThreadPoolNums = queryMessageThreadPoolNums;
+    }
+
     public int getAdminBrokerThreadPoolNums() {
         return adminBrokerThreadPoolNums;
     }
@@ -394,6 +405,14 @@ public void setPullThreadPoolQueueCapacity(int pullThreadPoolQueueCapacity) {
         this.pullThreadPoolQueueCapacity = pullThreadPoolQueueCapacity;
     }
 
+    public int getQueryThreadPoolQueueCapacity() {
+        return queryThreadPoolQueueCapacity;
+    }
+
+    public void setQueryThreadPoolQueueCapacity(final int queryThreadPoolQueueCapacity) {
+        this.queryThreadPoolQueueCapacity = queryThreadPoolQueueCapacity;
+    }
+
     public boolean isBrokerTopicEnable() {
         return brokerTopicEnable;
     }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services