You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by vo...@apache.org on 2018/07/16 09:37:10 UTC
[rocketmq] branch develop updated: [ISSUE #314] Heartbeat handler
use independently thread pool (#315)
This is an automated email from the ASF dual-hosted git repository.
vongosling pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new d8c446e [ISSUE #314] Heartbeat handler use independently thread pool (#315)
d8c446e is described below
commit d8c446e854e6cce1b54c0d9d97f3189832b88001
Author: fuyou001 <fu...@gmail.com>
AuthorDate: Mon Jul 16 17:37:04 2018 +0800
[ISSUE #314] Heartbeat handler use independently thread pool (#315)
---
.../apache/rocketmq/broker/BrokerController.java | 54 +++++++++++++---------
.../rocketmq/broker/latency/BrokerFastFailure.java | 3 ++
.../org/apache/rocketmq/common/BrokerConfig.java | 27 +++++++++++
3 files changed, 63 insertions(+), 21 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 9dbee82..f45674d 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -16,6 +16,21 @@
*/
package org.apache.rocketmq.broker;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.broker.client.ClientHousekeepingService;
import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener;
import org.apache.rocketmq.broker.client.ConsumerManager;
@@ -85,21 +100,6 @@ import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.stats.BrokerStats;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
public class BrokerController {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
@@ -129,6 +129,7 @@ public class BrokerController {
private final BlockingQueue<Runnable> pullThreadPoolQueue;
private final BlockingQueue<Runnable> queryThreadPoolQueue;
private final BlockingQueue<Runnable> clientManagerThreadPoolQueue;
+ private final BlockingQueue<Runnable> heartbeatThreadPoolQueue;
private final BlockingQueue<Runnable> consumerManagerThreadPoolQueue;
private final FilterServerManager filterServerManager;
private final BrokerStatsManager brokerStatsManager;
@@ -143,6 +144,7 @@ public class BrokerController {
private ExecutorService queryMessageExecutor;
private ExecutorService adminBrokerExecutor;
private ExecutorService clientManageExecutor;
+ private ExecutorService heartbeatExecutor;
private ExecutorService consumerManageExecutor;
private boolean updateMasterHAServerAddrPeriodically = false;
private BrokerStats brokerStats;
@@ -186,6 +188,7 @@ public class BrokerController {
this.queryThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getQueryThreadPoolQueueCapacity());
this.clientManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getClientManagerThreadPoolQueueCapacity());
this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity());
+ this.heartbeatThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getHeartbeatThreadPoolQueueCapacity());
this.brokerStatsManager = new BrokerStatsManager(this.brokerConfig.getBrokerClusterName());
this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort()));
@@ -280,6 +283,15 @@ public class BrokerController {
this.clientManagerThreadPoolQueue,
new ThreadFactoryImpl("ClientManageThread_"));
+ this.heartbeatExecutor = new BrokerFixedThreadPoolExecutor(
+ this.brokerConfig.getHeartbeatThreadPoolNums(),
+ this.brokerConfig.getHeartbeatThreadPoolNums(),
+ 1000 * 60,
+ TimeUnit.MILLISECONDS,
+ this.heartbeatThreadPoolQueue,
+ new ThreadFactoryImpl("HeartbeatThread_",true));
+
+
this.consumerManageExecutor =
Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl(
"ConsumerManageThread_"));
@@ -501,11 +513,11 @@ public class BrokerController {
* ClientManageProcessor
*/
ClientManageProcessor clientProcessor = new ClientManageProcessor(this);
- this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.clientManageExecutor);
+ this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);
this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);
this.remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);
- this.fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.clientManageExecutor);
+ this.fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);
@@ -729,10 +741,6 @@ public class BrokerController {
if (this.fileWatchService != null) {
this.fileWatchService.shutdown();
}
-
- if (this.transactionalMessageCheckService != null) {
- this.transactionalMessageCheckService.shutdown(false);
- }
}
private void unregisterBrokerAll() {
@@ -990,6 +998,10 @@ public class BrokerController {
return this.configuration;
}
+ public BlockingQueue<Runnable> getHeartbeatThreadPoolQueue() {
+ return heartbeatThreadPoolQueue;
+ }
+
public TransactionalMessageCheckService getTransactionalMessageCheckService() {
return transactionalMessageCheckService;
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
index 0159d32..0a8beca 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
@@ -89,6 +89,9 @@ public class BrokerFastFailure {
cleanExpiredRequestInQueue(this.brokerController.getPullThreadPoolQueue(),
this.brokerController.getBrokerConfig().getWaitTimeMillsInPullQueue());
+
+ cleanExpiredRequestInQueue(this.brokerController.getHeartbeatThreadPoolQueue(),
+ this.brokerController.getBrokerConfig().getWaitTimeMillsInHeartbeatQueue());
}
void cleanExpiredRequestInQueue(final BlockingQueue<Runnable> blockingQueue, final long maxWaitTimeMillsInQueue) {
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index e4486da..442f456 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -63,6 +63,7 @@ public class BrokerConfig {
private int adminBrokerThreadPoolNums = 16;
private int clientManageThreadPoolNums = 32;
private int consumerManageThreadPoolNums = 32;
+ private int heartbeatThreadPoolNums = Math.min(32,Runtime.getRuntime().availableProcessors());
private int flushConsumerOffsetInterval = 1000 * 5;
@@ -77,6 +78,7 @@ public class BrokerConfig {
private int queryThreadPoolQueueCapacity = 20000;
private int clientManagerThreadPoolQueueCapacity = 1000000;
private int consumerManagerThreadPoolQueueCapacity = 1000000;
+ private int heartbeatThreadPoolQueueCapacity = 50000;
private int filterServerNums = 0;
@@ -108,6 +110,7 @@ public class BrokerConfig {
private boolean brokerFastFailureEnable = true;
private long waitTimeMillsInSendQueue = 200;
private long waitTimeMillsInPullQueue = 5 * 1000;
+ private long waitTimeMillsInHeartbeatQueue = 31 * 1000;
private long startAcceptSendRequestTimeStamp = 0L;
@@ -643,6 +646,30 @@ public class BrokerConfig {
this.forceRegister = forceRegister;
}
+ public int getHeartbeatThreadPoolQueueCapacity() {
+ return heartbeatThreadPoolQueueCapacity;
+ }
+
+ public void setHeartbeatThreadPoolQueueCapacity(int heartbeatThreadPoolQueueCapacity) {
+ this.heartbeatThreadPoolQueueCapacity = heartbeatThreadPoolQueueCapacity;
+ }
+
+ public int getHeartbeatThreadPoolNums() {
+ return heartbeatThreadPoolNums;
+ }
+
+ public void setHeartbeatThreadPoolNums(int heartbeatThreadPoolNums) {
+ this.heartbeatThreadPoolNums = heartbeatThreadPoolNums;
+ }
+
+ public long getWaitTimeMillsInHeartbeatQueue() {
+ return waitTimeMillsInHeartbeatQueue;
+ }
+
+ public void setWaitTimeMillsInHeartbeatQueue(long waitTimeMillsInHeartbeatQueue) {
+ this.waitTimeMillsInHeartbeatQueue = waitTimeMillsInHeartbeatQueue;
+ }
+
public int getRegisterNameServerPeriod() {
return registerNameServerPeriod;
}