You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2018/08/17 03:20:07 UTC

[GitHub] vongosling closed pull request #397: [ISSUE #396]Use separated thread pool and add monitor tools for transactional message

vongosling closed pull request #397: [ISSUE #396]Use separated thread pool and add monitor tools for transactional message
URL: https://github.com/apache/rocketmq/pull/397
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index f45674d6e..a20692291 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -100,7 +100,6 @@
 import org.apache.rocketmq.store.stats.BrokerStats;
 import org.apache.rocketmq.store.stats.BrokerStatsManager;
 
-
 public class BrokerController {
     private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
     private static final InternalLogger LOG_PROTECTION = InternalLoggerFactory.getLogger(LoggerName.PROTECTION_LOGGER_NAME);
@@ -131,6 +130,7 @@
     private final BlockingQueue<Runnable> clientManagerThreadPoolQueue;
     private final BlockingQueue<Runnable> heartbeatThreadPoolQueue;
     private final BlockingQueue<Runnable> consumerManagerThreadPoolQueue;
+    private final BlockingQueue<Runnable> endTransactionThreadPoolQueue;
     private final FilterServerManager filterServerManager;
     private final BrokerStatsManager brokerStatsManager;
     private final List<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>();
@@ -146,6 +146,7 @@
     private ExecutorService clientManageExecutor;
     private ExecutorService heartbeatExecutor;
     private ExecutorService consumerManageExecutor;
+    private ExecutorService endTransactionExecutor;
     private boolean updateMasterHAServerAddrPeriodically = false;
     private BrokerStats brokerStats;
     private InetSocketAddress storeHost;
@@ -189,6 +190,7 @@ public BrokerController(
         this.clientManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getClientManagerThreadPoolQueueCapacity());
         this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity());
         this.heartbeatThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getHeartbeatThreadPoolQueueCapacity());
+        this.endTransactionThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getEndTransactionPoolQueueCapacity());
 
         this.brokerStatsManager = new BrokerStatsManager(this.brokerConfig.getBrokerClusterName());
         this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort()));
@@ -289,8 +291,15 @@ public boolean initialize() throws CloneNotSupportedException {
                 1000 * 60,
                 TimeUnit.MILLISECONDS,
                 this.heartbeatThreadPoolQueue,
-                new ThreadFactoryImpl("HeartbeatThread_",true));
+                new ThreadFactoryImpl("HeartbeatThread_", true));
 
+            this.endTransactionExecutor = new BrokerFixedThreadPoolExecutor(
+                this.brokerConfig.getEndTransactionThreadPoolNums(),
+                this.brokerConfig.getEndTransactionThreadPoolNums(),
+                1000 * 60,
+                TimeUnit.MILLISECONDS,
+                this.endTransactionThreadPoolQueue,
+                new ThreadFactoryImpl("EndTransactionThread_"));
 
             this.consumerManageExecutor =
                 Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl(
@@ -536,8 +545,8 @@ public void registerProcessor() {
         /**
          * EndTransactionProcessor
          */
-        this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.sendMessageExecutor);
-        this.fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.sendMessageExecutor);
+        this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor);
+        this.fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor);
 
         /**
          * Default
@@ -598,10 +607,15 @@ public long headSlowTimeMills4QueryThreadPoolQueue() {
         return this.headSlowTimeMills(this.queryThreadPoolQueue);
     }
 
+    public long headSlowTimeMills4EndTransactionThreadPoolQueue() {
+        return this.headSlowTimeMills(this.endTransactionThreadPoolQueue);
+    }
+
     public void printWaterMark() {
         LOG_WATER_MARK.info("[WATERMARK] Send Queue Size: {} SlowTimeMills: {}", this.sendThreadPoolQueue.size(), headSlowTimeMills4SendThreadPoolQueue());
         LOG_WATER_MARK.info("[WATERMARK] Pull Queue Size: {} SlowTimeMills: {}", this.pullThreadPoolQueue.size(), headSlowTimeMills4PullThreadPoolQueue());
         LOG_WATER_MARK.info("[WATERMARK] Query Queue Size: {} SlowTimeMills: {}", this.queryThreadPoolQueue.size(), headSlowTimeMills4QueryThreadPoolQueue());
+        LOG_WATER_MARK.info("[WATERMARK] Transaction Queue Size: {} SlowTimeMills: {}", this.endTransactionThreadPoolQueue.size(), headSlowTimeMills4EndTransactionThreadPoolQueue());
     }
 
     public MessageStore getMessageStore() {
@@ -741,6 +755,14 @@ public void shutdown() {
         if (this.fileWatchService != null) {
             this.fileWatchService.shutdown();
         }
+
+        if (this.transactionalMessageCheckService != null) {
+            this.transactionalMessageCheckService.shutdown();
+        }
+
+        if (this.endTransactionExecutor != null) {
+            this.endTransactionExecutor.shutdown();
+        }
     }
 
     private void unregisterBrokerAll() {
@@ -1027,4 +1049,8 @@ public void setTransactionalMessageCheckListener(
         AbstractTransactionalMessageCheckListener transactionalMessageCheckListener) {
         this.transactionalMessageCheckListener = transactionalMessageCheckListener;
     }
+
+    public BlockingQueue<Runnable> getEndTransactionThreadPoolQueue() {
+        return endTransactionThreadPoolQueue;
+    }
 }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
index 0a8beca2c..a018f68f6 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
@@ -92,6 +92,9 @@ private void cleanExpiredRequest() {
 
         cleanExpiredRequestInQueue(this.brokerController.getHeartbeatThreadPoolQueue(),
             this.brokerController.getBrokerConfig().getWaitTimeMillsInHeartbeatQueue());
+
+        cleanExpiredRequestInQueue(this.brokerController.getEndTransactionThreadPoolQueue(), this
+            .brokerController.getBrokerConfig().getWaitTimeMillsInTransactionQueue());
     }
 
     void cleanExpiredRequestInQueue(final BlockingQueue<Runnable> blockingQueue, final long maxWaitTimeMillsInQueue) {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 1a704a8c6..356aafc46 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -1210,6 +1210,10 @@ private RemotingCommand fetchAllConsumeStatsInBroker(ChannelHandlerContext ctx,
         runtimeInfo.put("queryThreadPoolQueueCapacity",
             String.valueOf(this.brokerController.getBrokerConfig().getQueryThreadPoolQueueCapacity()));
 
+        runtimeInfo.put("EndTransactionQueueSize", String.valueOf(this.brokerController.getEndTransactionThreadPoolQueue().size()));
+        runtimeInfo.put("EndTransactionThreadPoolQueueCapacity",
+            String.valueOf(this.brokerController.getBrokerConfig().getEndTransactionPoolQueueCapacity()));
+
         runtimeInfo.put("dispatchBehindBytes", String.valueOf(this.brokerController.getMessageStore().dispatchBehindBytes()));
         runtimeInfo.put("pageCacheLockTimeMills", String.valueOf(this.brokerController.getMessageStore().lockTimeMills()));
 
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/util/ServiceProvider.java b/broker/src/main/java/org/apache/rocketmq/broker/util/ServiceProvider.java
index 59be7a7e2..8b9b63e4d 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/util/ServiceProvider.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/util/ServiceProvider.java
@@ -125,23 +125,25 @@ protected static InputStream getResourceAsStream(ClassLoader loader, String name
 
     public static <T> T loadClass(String name, Class<?> clazz) {
         final InputStream is = getResourceAsStream(getContextClassLoader(), name);
-        BufferedReader reader;
-        try {
+        if (is != null) {
+            BufferedReader reader;
             try {
-                reader = new BufferedReader(new InputStreamReader(is, "UTF-8"));
-            } catch (java.io.UnsupportedEncodingException e) {
-                reader = new BufferedReader(new InputStreamReader(is));
-            }
-            String serviceName = reader.readLine();
-            reader.close();
-            if (serviceName != null && !"".equals(serviceName)) {
-                return initService(getContextClassLoader(), serviceName, clazz);
-            } else {
-                LOG.warn("ServiceName is empty!");
-                return null;
+                try {
+                    reader = new BufferedReader(new InputStreamReader(is, "UTF-8"));
+                } catch (java.io.UnsupportedEncodingException e) {
+                    reader = new BufferedReader(new InputStreamReader(is));
+                }
+                String serviceName = reader.readLine();
+                reader.close();
+                if (serviceName != null && !"".equals(serviceName)) {
+                    return initService(getContextClassLoader(), serviceName, clazz);
+                } else {
+                    LOG.warn("ServiceName is empty!");
+                    return null;
+                }
+            } catch (Exception e) {
+                LOG.warn("Error occurred when looking for resource file " + name, e);
             }
-        } catch (Exception e) {
-            LOG.error("Error occured when looking for resource file " + name, e);
         }
         return null;
     }
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 442f456aa..963c88a13 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -63,7 +63,12 @@
     private int adminBrokerThreadPoolNums = 16;
     private int clientManageThreadPoolNums = 32;
     private int consumerManageThreadPoolNums = 32;
-    private int heartbeatThreadPoolNums = Math.min(32,Runtime.getRuntime().availableProcessors());
+    private int heartbeatThreadPoolNums = Math.min(32, Runtime.getRuntime().availableProcessors());
+
+    /**
+     * Thread numbers for EndTransactionProcessor
+     */
+    private int endTransactionThreadPoolNums = 8 + Runtime.getRuntime().availableProcessors() * 2;
 
     private int flushConsumerOffsetInterval = 1000 * 5;
 
@@ -79,6 +84,7 @@
     private int clientManagerThreadPoolQueueCapacity = 1000000;
     private int consumerManagerThreadPoolQueueCapacity = 1000000;
     private int heartbeatThreadPoolQueueCapacity = 50000;
+    private int endTransactionPoolQueueCapacity = 100000;
 
     private int filterServerNums = 0;
 
@@ -111,6 +117,7 @@
     private long waitTimeMillsInSendQueue = 200;
     private long waitTimeMillsInPullQueue = 5 * 1000;
     private long waitTimeMillsInHeartbeatQueue = 31 * 1000;
+    private long waitTimeMillsInTransactionQueue = 3 * 1000;
 
     private long startAcceptSendRequestTimeStamp = 0L;
 
@@ -156,7 +163,7 @@
      * The maximum number of times the message was checked, if exceed this value, this message will be discarded.
      */
     @ImportantField
-    private int transactionCheckMax = 5;
+    private int transactionCheckMax = 15;
 
     /**
      * Transaction message check interval.
@@ -701,4 +708,28 @@ public long getTransactionCheckInterval() {
     public void setTransactionCheckInterval(long transactionCheckInterval) {
         this.transactionCheckInterval = transactionCheckInterval;
     }
+
+    public int getEndTransactionThreadPoolNums() {
+        return endTransactionThreadPoolNums;
+    }
+
+    public void setEndTransactionThreadPoolNums(int endTransactionThreadPoolNums) {
+        this.endTransactionThreadPoolNums = endTransactionThreadPoolNums;
+    }
+
+    public int getEndTransactionPoolQueueCapacity() {
+        return endTransactionPoolQueueCapacity;
+    }
+
+    public void setEndTransactionPoolQueueCapacity(int endTransactionPoolQueueCapacity) {
+        this.endTransactionPoolQueueCapacity = endTransactionPoolQueueCapacity;
+    }
+
+    public long getWaitTimeMillsInTransactionQueue() {
+        return waitTimeMillsInTransactionQueue;
+    }
+
+    public void setWaitTimeMillsInTransactionQueue(long waitTimeMillsInTransactionQueue) {
+        this.waitTimeMillsInTransactionQueue = waitTimeMillsInTransactionQueue;
+    }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services