You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by vo...@apache.org on 2018/07/16 09:37:10 UTC

[rocketmq] branch develop updated: [ISSUE #314] Heartbeat handler use independently thread pool (#315)

This is an automated email from the ASF dual-hosted git repository.

vongosling pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new d8c446e  [ISSUE #314] Heartbeat handler use independently thread pool  (#315)
d8c446e is described below

commit d8c446e854e6cce1b54c0d9d97f3189832b88001
Author: fuyou001 <fu...@gmail.com>
AuthorDate: Mon Jul 16 17:37:04 2018 +0800

    [ISSUE #314] Heartbeat handler use independently thread pool  (#315)
---
 .../apache/rocketmq/broker/BrokerController.java   | 54 +++++++++++++---------
 .../rocketmq/broker/latency/BrokerFastFailure.java |  3 ++
 .../org/apache/rocketmq/common/BrokerConfig.java   | 27 +++++++++++
 3 files changed, 63 insertions(+), 21 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 9dbee82..f45674d 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -16,6 +16,21 @@
  */
 package org.apache.rocketmq.broker;
 
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.broker.client.ClientHousekeepingService;
 import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener;
 import org.apache.rocketmq.broker.client.ConsumerManager;
@@ -85,21 +100,6 @@ import org.apache.rocketmq.store.config.MessageStoreConfig;
 import org.apache.rocketmq.store.stats.BrokerStats;
 import org.apache.rocketmq.store.stats.BrokerStatsManager;
 
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 
 public class BrokerController {
     private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
@@ -129,6 +129,7 @@ public class BrokerController {
     private final BlockingQueue<Runnable> pullThreadPoolQueue;
     private final BlockingQueue<Runnable> queryThreadPoolQueue;
     private final BlockingQueue<Runnable> clientManagerThreadPoolQueue;
+    private final BlockingQueue<Runnable> heartbeatThreadPoolQueue;
     private final BlockingQueue<Runnable> consumerManagerThreadPoolQueue;
     private final FilterServerManager filterServerManager;
     private final BrokerStatsManager brokerStatsManager;
@@ -143,6 +144,7 @@ public class BrokerController {
     private ExecutorService queryMessageExecutor;
     private ExecutorService adminBrokerExecutor;
     private ExecutorService clientManageExecutor;
+    private ExecutorService heartbeatExecutor;
     private ExecutorService consumerManageExecutor;
     private boolean updateMasterHAServerAddrPeriodically = false;
     private BrokerStats brokerStats;
@@ -186,6 +188,7 @@ public class BrokerController {
         this.queryThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getQueryThreadPoolQueueCapacity());
         this.clientManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getClientManagerThreadPoolQueueCapacity());
         this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity());
+        this.heartbeatThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getHeartbeatThreadPoolQueueCapacity());
 
         this.brokerStatsManager = new BrokerStatsManager(this.brokerConfig.getBrokerClusterName());
         this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort()));
@@ -280,6 +283,15 @@ public class BrokerController {
                 this.clientManagerThreadPoolQueue,
                 new ThreadFactoryImpl("ClientManageThread_"));
 
+            this.heartbeatExecutor = new BrokerFixedThreadPoolExecutor(
+                this.brokerConfig.getHeartbeatThreadPoolNums(),
+                this.brokerConfig.getHeartbeatThreadPoolNums(),
+                1000 * 60,
+                TimeUnit.MILLISECONDS,
+                this.heartbeatThreadPoolQueue,
+                new ThreadFactoryImpl("HeartbeatThread_",true));
+
+
             this.consumerManageExecutor =
                 Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl(
                     "ConsumerManageThread_"));
@@ -501,11 +513,11 @@ public class BrokerController {
          * ClientManageProcessor
          */
         ClientManageProcessor clientProcessor = new ClientManageProcessor(this);
-        this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.clientManageExecutor);
+        this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);
         this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);
         this.remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);
 
-        this.fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.clientManageExecutor);
+        this.fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);
         this.fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);
         this.fastRemotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);
 
@@ -729,10 +741,6 @@ public class BrokerController {
         if (this.fileWatchService != null) {
             this.fileWatchService.shutdown();
         }
-
-        if (this.transactionalMessageCheckService != null) {
-            this.transactionalMessageCheckService.shutdown(false);
-        }
     }
 
     private void unregisterBrokerAll() {
@@ -990,6 +998,10 @@ public class BrokerController {
         return this.configuration;
     }
 
+    public BlockingQueue<Runnable> getHeartbeatThreadPoolQueue() {
+        return heartbeatThreadPoolQueue;
+    }
+
     public TransactionalMessageCheckService getTransactionalMessageCheckService() {
         return transactionalMessageCheckService;
     }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
index 0159d32..0a8beca 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
@@ -89,6 +89,9 @@ public class BrokerFastFailure {
 
         cleanExpiredRequestInQueue(this.brokerController.getPullThreadPoolQueue(),
             this.brokerController.getBrokerConfig().getWaitTimeMillsInPullQueue());
+
+        cleanExpiredRequestInQueue(this.brokerController.getHeartbeatThreadPoolQueue(),
+            this.brokerController.getBrokerConfig().getWaitTimeMillsInHeartbeatQueue());
     }
 
     void cleanExpiredRequestInQueue(final BlockingQueue<Runnable> blockingQueue, final long maxWaitTimeMillsInQueue) {
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index e4486da..442f456 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -63,6 +63,7 @@ public class BrokerConfig {
     private int adminBrokerThreadPoolNums = 16;
     private int clientManageThreadPoolNums = 32;
     private int consumerManageThreadPoolNums = 32;
+    private int heartbeatThreadPoolNums = Math.min(32,Runtime.getRuntime().availableProcessors());
 
     private int flushConsumerOffsetInterval = 1000 * 5;
 
@@ -77,6 +78,7 @@ public class BrokerConfig {
     private int queryThreadPoolQueueCapacity = 20000;
     private int clientManagerThreadPoolQueueCapacity = 1000000;
     private int consumerManagerThreadPoolQueueCapacity = 1000000;
+    private int heartbeatThreadPoolQueueCapacity = 50000;
 
     private int filterServerNums = 0;
 
@@ -108,6 +110,7 @@ public class BrokerConfig {
     private boolean brokerFastFailureEnable = true;
     private long waitTimeMillsInSendQueue = 200;
     private long waitTimeMillsInPullQueue = 5 * 1000;
+    private long waitTimeMillsInHeartbeatQueue = 31 * 1000;
 
     private long startAcceptSendRequestTimeStamp = 0L;
 
@@ -643,6 +646,30 @@ public class BrokerConfig {
         this.forceRegister = forceRegister;
     }
 
+    public int getHeartbeatThreadPoolQueueCapacity() {
+        return heartbeatThreadPoolQueueCapacity;
+    }
+
+    public void setHeartbeatThreadPoolQueueCapacity(int heartbeatThreadPoolQueueCapacity) {
+        this.heartbeatThreadPoolQueueCapacity = heartbeatThreadPoolQueueCapacity;
+    }
+
+    public int getHeartbeatThreadPoolNums() {
+        return heartbeatThreadPoolNums;
+    }
+
+    public void setHeartbeatThreadPoolNums(int heartbeatThreadPoolNums) {
+        this.heartbeatThreadPoolNums = heartbeatThreadPoolNums;
+    }
+
+    public long getWaitTimeMillsInHeartbeatQueue() {
+        return waitTimeMillsInHeartbeatQueue;
+    }
+
+    public void setWaitTimeMillsInHeartbeatQueue(long waitTimeMillsInHeartbeatQueue) {
+        this.waitTimeMillsInHeartbeatQueue = waitTimeMillsInHeartbeatQueue;
+    }
+
     public int getRegisterNameServerPeriod() {
         return registerNameServerPeriod;
     }