You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by vo...@apache.org on 2018/08/17 03:20:10 UTC
[rocketmq] branch develop updated: [ISSUE #396]Use separated thread
pool and add monitor tools for transactional message (#397)
This is an automated email from the ASF dual-hosted git repository.
vongosling pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new efa8e45 [ISSUE #396]Use separated thread pool and add monitor tools for transactional message (#397)
efa8e45 is described below
commit efa8e457e84a7406923594b9516545661b56765e
Author: duheng <39...@users.noreply.github.com>
AuthorDate: Fri Aug 17 11:20:05 2018 +0800
[ISSUE #396]Use separated thread pool and add monitor tools for transactional message (#397)
* Use separate threadpool and add monitor tools for transaction
* Modify log level
---
.../apache/rocketmq/broker/BrokerController.java | 34 ++++++++++++++++++---
.../rocketmq/broker/latency/BrokerFastFailure.java | 3 ++
.../broker/processor/AdminBrokerProcessor.java | 4 +++
.../rocketmq/broker/util/ServiceProvider.java | 32 ++++++++++----------
.../org/apache/rocketmq/common/BrokerConfig.java | 35 ++++++++++++++++++++--
5 files changed, 87 insertions(+), 21 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index f45674d..a206922 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -100,7 +100,6 @@ import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.stats.BrokerStats;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
-
public class BrokerController {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final InternalLogger LOG_PROTECTION = InternalLoggerFactory.getLogger(LoggerName.PROTECTION_LOGGER_NAME);
@@ -131,6 +130,7 @@ public class BrokerController {
private final BlockingQueue<Runnable> clientManagerThreadPoolQueue;
private final BlockingQueue<Runnable> heartbeatThreadPoolQueue;
private final BlockingQueue<Runnable> consumerManagerThreadPoolQueue;
+ private final BlockingQueue<Runnable> endTransactionThreadPoolQueue;
private final FilterServerManager filterServerManager;
private final BrokerStatsManager brokerStatsManager;
private final List<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>();
@@ -146,6 +146,7 @@ public class BrokerController {
private ExecutorService clientManageExecutor;
private ExecutorService heartbeatExecutor;
private ExecutorService consumerManageExecutor;
+ private ExecutorService endTransactionExecutor;
private boolean updateMasterHAServerAddrPeriodically = false;
private BrokerStats brokerStats;
private InetSocketAddress storeHost;
@@ -189,6 +190,7 @@ public class BrokerController {
this.clientManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getClientManagerThreadPoolQueueCapacity());
this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity());
this.heartbeatThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getHeartbeatThreadPoolQueueCapacity());
+ this.endTransactionThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getEndTransactionPoolQueueCapacity());
this.brokerStatsManager = new BrokerStatsManager(this.brokerConfig.getBrokerClusterName());
this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort()));
@@ -289,8 +291,15 @@ public class BrokerController {
1000 * 60,
TimeUnit.MILLISECONDS,
this.heartbeatThreadPoolQueue,
- new ThreadFactoryImpl("HeartbeatThread_",true));
+ new ThreadFactoryImpl("HeartbeatThread_", true));
+ this.endTransactionExecutor = new BrokerFixedThreadPoolExecutor(
+ this.brokerConfig.getEndTransactionThreadPoolNums(),
+ this.brokerConfig.getEndTransactionThreadPoolNums(),
+ 1000 * 60,
+ TimeUnit.MILLISECONDS,
+ this.endTransactionThreadPoolQueue,
+ new ThreadFactoryImpl("EndTransactionThread_"));
this.consumerManageExecutor =
Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl(
@@ -536,8 +545,8 @@ public class BrokerController {
/**
* EndTransactionProcessor
*/
- this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.sendMessageExecutor);
- this.fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.sendMessageExecutor);
+ this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor);
+ this.fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor);
/**
* Default
@@ -598,10 +607,15 @@ public class BrokerController {
return this.headSlowTimeMills(this.queryThreadPoolQueue);
}
+ public long headSlowTimeMills4EndTransactionThreadPoolQueue() {
+ return this.headSlowTimeMills(this.endTransactionThreadPoolQueue);
+ }
+
public void printWaterMark() {
LOG_WATER_MARK.info("[WATERMARK] Send Queue Size: {} SlowTimeMills: {}", this.sendThreadPoolQueue.size(), headSlowTimeMills4SendThreadPoolQueue());
LOG_WATER_MARK.info("[WATERMARK] Pull Queue Size: {} SlowTimeMills: {}", this.pullThreadPoolQueue.size(), headSlowTimeMills4PullThreadPoolQueue());
LOG_WATER_MARK.info("[WATERMARK] Query Queue Size: {} SlowTimeMills: {}", this.queryThreadPoolQueue.size(), headSlowTimeMills4QueryThreadPoolQueue());
+ LOG_WATER_MARK.info("[WATERMARK] Transaction Queue Size: {} SlowTimeMills: {}", this.endTransactionThreadPoolQueue.size(), headSlowTimeMills4EndTransactionThreadPoolQueue());
}
public MessageStore getMessageStore() {
@@ -741,6 +755,14 @@ public class BrokerController {
if (this.fileWatchService != null) {
this.fileWatchService.shutdown();
}
+
+ if (this.transactionalMessageCheckService != null) {
+ this.transactionalMessageCheckService.shutdown();
+ }
+
+ if (this.endTransactionExecutor != null) {
+ this.endTransactionExecutor.shutdown();
+ }
}
private void unregisterBrokerAll() {
@@ -1027,4 +1049,8 @@ public class BrokerController {
AbstractTransactionalMessageCheckListener transactionalMessageCheckListener) {
this.transactionalMessageCheckListener = transactionalMessageCheckListener;
}
+
+ public BlockingQueue<Runnable> getEndTransactionThreadPoolQueue() {
+ return endTransactionThreadPoolQueue;
+ }
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
index 0a8beca..a018f68 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
@@ -92,6 +92,9 @@ public class BrokerFastFailure {
cleanExpiredRequestInQueue(this.brokerController.getHeartbeatThreadPoolQueue(),
this.brokerController.getBrokerConfig().getWaitTimeMillsInHeartbeatQueue());
+
+ cleanExpiredRequestInQueue(this.brokerController.getEndTransactionThreadPoolQueue(), this
+ .brokerController.getBrokerConfig().getWaitTimeMillsInTransactionQueue());
}
void cleanExpiredRequestInQueue(final BlockingQueue<Runnable> blockingQueue, final long maxWaitTimeMillsInQueue) {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 1a704a8..356aafc 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -1210,6 +1210,10 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
runtimeInfo.put("queryThreadPoolQueueCapacity",
String.valueOf(this.brokerController.getBrokerConfig().getQueryThreadPoolQueueCapacity()));
+ runtimeInfo.put("EndTransactionQueueSize", String.valueOf(this.brokerController.getEndTransactionThreadPoolQueue().size()));
+ runtimeInfo.put("EndTransactionThreadPoolQueueCapacity",
+ String.valueOf(this.brokerController.getBrokerConfig().getEndTransactionPoolQueueCapacity()));
+
runtimeInfo.put("dispatchBehindBytes", String.valueOf(this.brokerController.getMessageStore().dispatchBehindBytes()));
runtimeInfo.put("pageCacheLockTimeMills", String.valueOf(this.brokerController.getMessageStore().lockTimeMills()));
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/util/ServiceProvider.java b/broker/src/main/java/org/apache/rocketmq/broker/util/ServiceProvider.java
index 59be7a7..8b9b63e 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/util/ServiceProvider.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/util/ServiceProvider.java
@@ -125,23 +125,25 @@ public class ServiceProvider {
public static <T> T loadClass(String name, Class<?> clazz) {
final InputStream is = getResourceAsStream(getContextClassLoader(), name);
- BufferedReader reader;
- try {
+ if (is != null) {
+ BufferedReader reader;
try {
- reader = new BufferedReader(new InputStreamReader(is, "UTF-8"));
- } catch (java.io.UnsupportedEncodingException e) {
- reader = new BufferedReader(new InputStreamReader(is));
- }
- String serviceName = reader.readLine();
- reader.close();
- if (serviceName != null && !"".equals(serviceName)) {
- return initService(getContextClassLoader(), serviceName, clazz);
- } else {
- LOG.warn("ServiceName is empty!");
- return null;
+ try {
+ reader = new BufferedReader(new InputStreamReader(is, "UTF-8"));
+ } catch (java.io.UnsupportedEncodingException e) {
+ reader = new BufferedReader(new InputStreamReader(is));
+ }
+ String serviceName = reader.readLine();
+ reader.close();
+ if (serviceName != null && !"".equals(serviceName)) {
+ return initService(getContextClassLoader(), serviceName, clazz);
+ } else {
+ LOG.warn("ServiceName is empty!");
+ return null;
+ }
+ } catch (Exception e) {
+ LOG.warn("Error occurred when looking for resource file " + name, e);
}
- } catch (Exception e) {
- LOG.error("Error occured when looking for resource file " + name, e);
}
return null;
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 442f456..963c88a 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -63,7 +63,12 @@ public class BrokerConfig {
private int adminBrokerThreadPoolNums = 16;
private int clientManageThreadPoolNums = 32;
private int consumerManageThreadPoolNums = 32;
- private int heartbeatThreadPoolNums = Math.min(32,Runtime.getRuntime().availableProcessors());
+ private int heartbeatThreadPoolNums = Math.min(32, Runtime.getRuntime().availableProcessors());
+
+ /**
+ * Thread numbers for EndTransactionProcessor
+ */
+ private int endTransactionThreadPoolNums = 8 + Runtime.getRuntime().availableProcessors() * 2;
private int flushConsumerOffsetInterval = 1000 * 5;
@@ -79,6 +84,7 @@ public class BrokerConfig {
private int clientManagerThreadPoolQueueCapacity = 1000000;
private int consumerManagerThreadPoolQueueCapacity = 1000000;
private int heartbeatThreadPoolQueueCapacity = 50000;
+ private int endTransactionPoolQueueCapacity = 100000;
private int filterServerNums = 0;
@@ -111,6 +117,7 @@ public class BrokerConfig {
private long waitTimeMillsInSendQueue = 200;
private long waitTimeMillsInPullQueue = 5 * 1000;
private long waitTimeMillsInHeartbeatQueue = 31 * 1000;
+ private long waitTimeMillsInTransactionQueue = 3 * 1000;
private long startAcceptSendRequestTimeStamp = 0L;
@@ -156,7 +163,7 @@ public class BrokerConfig {
* The maximum number of times the message was checked, if exceed this value, this message will be discarded.
*/
@ImportantField
- private int transactionCheckMax = 5;
+ private int transactionCheckMax = 15;
/**
* Transaction message check interval.
@@ -701,4 +708,28 @@ public class BrokerConfig {
public void setTransactionCheckInterval(long transactionCheckInterval) {
this.transactionCheckInterval = transactionCheckInterval;
}
+
+ public int getEndTransactionThreadPoolNums() {
+ return endTransactionThreadPoolNums;
+ }
+
+ public void setEndTransactionThreadPoolNums(int endTransactionThreadPoolNums) {
+ this.endTransactionThreadPoolNums = endTransactionThreadPoolNums;
+ }
+
+ public int getEndTransactionPoolQueueCapacity() {
+ return endTransactionPoolQueueCapacity;
+ }
+
+ public void setEndTransactionPoolQueueCapacity(int endTransactionPoolQueueCapacity) {
+ this.endTransactionPoolQueueCapacity = endTransactionPoolQueueCapacity;
+ }
+
+ public long getWaitTimeMillsInTransactionQueue() {
+ return waitTimeMillsInTransactionQueue;
+ }
+
+ public void setWaitTimeMillsInTransactionQueue(long waitTimeMillsInTransactionQueue) {
+ this.waitTimeMillsInTransactionQueue = waitTimeMillsInTransactionQueue;
+ }
}