You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by vo...@apache.org on 2018/08/17 03:20:10 UTC

[rocketmq] branch develop updated: [ISSUE #396]Use separated thread pool and add monitor tools for transactional message (#397)

This is an automated email from the ASF dual-hosted git repository.

vongosling pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new efa8e45  [ISSUE #396]Use separated thread pool and add monitor tools for transactional message (#397)
efa8e45 is described below

commit efa8e457e84a7406923594b9516545661b56765e
Author: duheng <39...@users.noreply.github.com>
AuthorDate: Fri Aug 17 11:20:05 2018 +0800

    [ISSUE #396]Use separated thread pool and add monitor tools for transactional message (#397)
    
    * Use separate threadpool and add monitor tools for transaction
    
    * Modify log level
---
 .../apache/rocketmq/broker/BrokerController.java   | 34 ++++++++++++++++++---
 .../rocketmq/broker/latency/BrokerFastFailure.java |  3 ++
 .../broker/processor/AdminBrokerProcessor.java     |  4 +++
 .../rocketmq/broker/util/ServiceProvider.java      | 32 ++++++++++----------
 .../org/apache/rocketmq/common/BrokerConfig.java   | 35 ++++++++++++++++++++--
 5 files changed, 87 insertions(+), 21 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index f45674d..a206922 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -100,7 +100,6 @@ import org.apache.rocketmq.store.config.MessageStoreConfig;
 import org.apache.rocketmq.store.stats.BrokerStats;
 import org.apache.rocketmq.store.stats.BrokerStatsManager;
 
-
 public class BrokerController {
     private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
     private static final InternalLogger LOG_PROTECTION = InternalLoggerFactory.getLogger(LoggerName.PROTECTION_LOGGER_NAME);
@@ -131,6 +130,7 @@ public class BrokerController {
     private final BlockingQueue<Runnable> clientManagerThreadPoolQueue;
     private final BlockingQueue<Runnable> heartbeatThreadPoolQueue;
     private final BlockingQueue<Runnable> consumerManagerThreadPoolQueue;
+    private final BlockingQueue<Runnable> endTransactionThreadPoolQueue;
     private final FilterServerManager filterServerManager;
     private final BrokerStatsManager brokerStatsManager;
     private final List<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>();
@@ -146,6 +146,7 @@ public class BrokerController {
     private ExecutorService clientManageExecutor;
     private ExecutorService heartbeatExecutor;
     private ExecutorService consumerManageExecutor;
+    private ExecutorService endTransactionExecutor;
     private boolean updateMasterHAServerAddrPeriodically = false;
     private BrokerStats brokerStats;
     private InetSocketAddress storeHost;
@@ -189,6 +190,7 @@ public class BrokerController {
         this.clientManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getClientManagerThreadPoolQueueCapacity());
         this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity());
         this.heartbeatThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getHeartbeatThreadPoolQueueCapacity());
+        this.endTransactionThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getEndTransactionPoolQueueCapacity());
 
         this.brokerStatsManager = new BrokerStatsManager(this.brokerConfig.getBrokerClusterName());
         this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort()));
@@ -289,8 +291,15 @@ public class BrokerController {
                 1000 * 60,
                 TimeUnit.MILLISECONDS,
                 this.heartbeatThreadPoolQueue,
-                new ThreadFactoryImpl("HeartbeatThread_",true));
+                new ThreadFactoryImpl("HeartbeatThread_", true));
 
+            this.endTransactionExecutor = new BrokerFixedThreadPoolExecutor(
+                this.brokerConfig.getEndTransactionThreadPoolNums(),
+                this.brokerConfig.getEndTransactionThreadPoolNums(),
+                1000 * 60,
+                TimeUnit.MILLISECONDS,
+                this.endTransactionThreadPoolQueue,
+                new ThreadFactoryImpl("EndTransactionThread_"));
 
             this.consumerManageExecutor =
                 Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl(
@@ -536,8 +545,8 @@ public class BrokerController {
         /**
          * EndTransactionProcessor
          */
-        this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.sendMessageExecutor);
-        this.fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.sendMessageExecutor);
+        this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor);
+        this.fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor);
 
         /**
          * Default
@@ -598,10 +607,15 @@ public class BrokerController {
         return this.headSlowTimeMills(this.queryThreadPoolQueue);
     }
 
+    public long headSlowTimeMills4EndTransactionThreadPoolQueue() {
+        return this.headSlowTimeMills(this.endTransactionThreadPoolQueue);
+    }
+
     public void printWaterMark() {
         LOG_WATER_MARK.info("[WATERMARK] Send Queue Size: {} SlowTimeMills: {}", this.sendThreadPoolQueue.size(), headSlowTimeMills4SendThreadPoolQueue());
         LOG_WATER_MARK.info("[WATERMARK] Pull Queue Size: {} SlowTimeMills: {}", this.pullThreadPoolQueue.size(), headSlowTimeMills4PullThreadPoolQueue());
         LOG_WATER_MARK.info("[WATERMARK] Query Queue Size: {} SlowTimeMills: {}", this.queryThreadPoolQueue.size(), headSlowTimeMills4QueryThreadPoolQueue());
+        LOG_WATER_MARK.info("[WATERMARK] Transaction Queue Size: {} SlowTimeMills: {}", this.endTransactionThreadPoolQueue.size(), headSlowTimeMills4EndTransactionThreadPoolQueue());
     }
 
     public MessageStore getMessageStore() {
@@ -741,6 +755,14 @@ public class BrokerController {
         if (this.fileWatchService != null) {
             this.fileWatchService.shutdown();
         }
+
+        if (this.transactionalMessageCheckService != null) {
+            this.transactionalMessageCheckService.shutdown();
+        }
+
+        if (this.endTransactionExecutor != null) {
+            this.endTransactionExecutor.shutdown();
+        }
     }
 
     private void unregisterBrokerAll() {
@@ -1027,4 +1049,8 @@ public class BrokerController {
         AbstractTransactionalMessageCheckListener transactionalMessageCheckListener) {
         this.transactionalMessageCheckListener = transactionalMessageCheckListener;
     }
+
+    public BlockingQueue<Runnable> getEndTransactionThreadPoolQueue() {
+        return endTransactionThreadPoolQueue;
+    }
 }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
index 0a8beca..a018f68 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
@@ -92,6 +92,9 @@ public class BrokerFastFailure {
 
         cleanExpiredRequestInQueue(this.brokerController.getHeartbeatThreadPoolQueue(),
             this.brokerController.getBrokerConfig().getWaitTimeMillsInHeartbeatQueue());
+
+        cleanExpiredRequestInQueue(this.brokerController.getEndTransactionThreadPoolQueue(), this
+            .brokerController.getBrokerConfig().getWaitTimeMillsInTransactionQueue());
     }
 
     void cleanExpiredRequestInQueue(final BlockingQueue<Runnable> blockingQueue, final long maxWaitTimeMillsInQueue) {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 1a704a8..356aafc 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -1210,6 +1210,10 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
         runtimeInfo.put("queryThreadPoolQueueCapacity",
             String.valueOf(this.brokerController.getBrokerConfig().getQueryThreadPoolQueueCapacity()));
 
+        runtimeInfo.put("EndTransactionQueueSize", String.valueOf(this.brokerController.getEndTransactionThreadPoolQueue().size()));
+        runtimeInfo.put("EndTransactionThreadPoolQueueCapacity",
+            String.valueOf(this.brokerController.getBrokerConfig().getEndTransactionPoolQueueCapacity()));
+
         runtimeInfo.put("dispatchBehindBytes", String.valueOf(this.brokerController.getMessageStore().dispatchBehindBytes()));
         runtimeInfo.put("pageCacheLockTimeMills", String.valueOf(this.brokerController.getMessageStore().lockTimeMills()));
 
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/util/ServiceProvider.java b/broker/src/main/java/org/apache/rocketmq/broker/util/ServiceProvider.java
index 59be7a7..8b9b63e 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/util/ServiceProvider.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/util/ServiceProvider.java
@@ -125,23 +125,25 @@ public class ServiceProvider {
 
     public static <T> T loadClass(String name, Class<?> clazz) {
         final InputStream is = getResourceAsStream(getContextClassLoader(), name);
-        BufferedReader reader;
-        try {
+        if (is != null) {
+            BufferedReader reader;
             try {
-                reader = new BufferedReader(new InputStreamReader(is, "UTF-8"));
-            } catch (java.io.UnsupportedEncodingException e) {
-                reader = new BufferedReader(new InputStreamReader(is));
-            }
-            String serviceName = reader.readLine();
-            reader.close();
-            if (serviceName != null && !"".equals(serviceName)) {
-                return initService(getContextClassLoader(), serviceName, clazz);
-            } else {
-                LOG.warn("ServiceName is empty!");
-                return null;
+                try {
+                    reader = new BufferedReader(new InputStreamReader(is, "UTF-8"));
+                } catch (java.io.UnsupportedEncodingException e) {
+                    reader = new BufferedReader(new InputStreamReader(is));
+                }
+                String serviceName = reader.readLine();
+                reader.close();
+                if (serviceName != null && !"".equals(serviceName)) {
+                    return initService(getContextClassLoader(), serviceName, clazz);
+                } else {
+                    LOG.warn("ServiceName is empty!");
+                    return null;
+                }
+            } catch (Exception e) {
+                LOG.warn("Error occurred when looking for resource file " + name, e);
             }
-        } catch (Exception e) {
-            LOG.error("Error occured when looking for resource file " + name, e);
         }
         return null;
     }
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 442f456..963c88a 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -63,7 +63,12 @@ public class BrokerConfig {
     private int adminBrokerThreadPoolNums = 16;
     private int clientManageThreadPoolNums = 32;
     private int consumerManageThreadPoolNums = 32;
-    private int heartbeatThreadPoolNums = Math.min(32,Runtime.getRuntime().availableProcessors());
+    private int heartbeatThreadPoolNums = Math.min(32, Runtime.getRuntime().availableProcessors());
+
+    /**
+     * Thread numbers for EndTransactionProcessor
+     */
+    private int endTransactionThreadPoolNums = 8 + Runtime.getRuntime().availableProcessors() * 2;
 
     private int flushConsumerOffsetInterval = 1000 * 5;
 
@@ -79,6 +84,7 @@ public class BrokerConfig {
     private int clientManagerThreadPoolQueueCapacity = 1000000;
     private int consumerManagerThreadPoolQueueCapacity = 1000000;
     private int heartbeatThreadPoolQueueCapacity = 50000;
+    private int endTransactionPoolQueueCapacity = 100000;
 
     private int filterServerNums = 0;
 
@@ -111,6 +117,7 @@ public class BrokerConfig {
     private long waitTimeMillsInSendQueue = 200;
     private long waitTimeMillsInPullQueue = 5 * 1000;
     private long waitTimeMillsInHeartbeatQueue = 31 * 1000;
+    private long waitTimeMillsInTransactionQueue = 3 * 1000;
 
     private long startAcceptSendRequestTimeStamp = 0L;
 
@@ -156,7 +163,7 @@ public class BrokerConfig {
      * The maximum number of times the message was checked, if exceed this value, this message will be discarded.
      */
     @ImportantField
-    private int transactionCheckMax = 5;
+    private int transactionCheckMax = 15;
 
     /**
      * Transaction message check interval.
@@ -701,4 +708,28 @@ public class BrokerConfig {
     public void setTransactionCheckInterval(long transactionCheckInterval) {
         this.transactionCheckInterval = transactionCheckInterval;
     }
+
+    public int getEndTransactionThreadPoolNums() {
+        return endTransactionThreadPoolNums;
+    }
+
+    public void setEndTransactionThreadPoolNums(int endTransactionThreadPoolNums) {
+        this.endTransactionThreadPoolNums = endTransactionThreadPoolNums;
+    }
+
+    public int getEndTransactionPoolQueueCapacity() {
+        return endTransactionPoolQueueCapacity;
+    }
+
+    public void setEndTransactionPoolQueueCapacity(int endTransactionPoolQueueCapacity) {
+        this.endTransactionPoolQueueCapacity = endTransactionPoolQueueCapacity;
+    }
+
+    public long getWaitTimeMillsInTransactionQueue() {
+        return waitTimeMillsInTransactionQueue;
+    }
+
+    public void setWaitTimeMillsInTransactionQueue(long waitTimeMillsInTransactionQueue) {
+        this.waitTimeMillsInTransactionQueue = waitTimeMillsInTransactionQueue;
+    }
 }