You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/09/20 01:36:44 UTC

[rocketmq] branch develop updated: [ISSUE #5091] Speed up broker initialization by concurrently loading ConsumeQueue (#5093)

This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new b7bec238b [ISSUE #5091] Speed up broker initialization by concurrently loading ConsumeQueue (#5093)
b7bec238b is described below

commit b7bec238b9f2c5e7f97d469fb3ede39f89f324a8
Author: lizhimins <70...@qq.com>
AuthorDate: Tue Sep 20 09:36:18 2022 +0800

    [ISSUE #5091] Speed up broker initialization by concurrently loading ConsumeQueue (#5093)
---
 .../org/apache/rocketmq/common/BrokerConfig.java   | 20 ++++++
 .../org/apache/rocketmq/store/ConsumeQueue.java    |  1 +
 .../apache/rocketmq/store/DefaultMessageStore.java | 52 +++++++++-----
 .../rocketmq/store/queue/ConsumeQueueStore.java    | 79 +++++++++++++++++++++-
 4 files changed, 132 insertions(+), 20 deletions(-)

diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index f82f6a1de..4c7b53143 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -44,6 +44,9 @@ public class BrokerConfig extends BrokerIdentity {
     private String brokerIP1 = RemotingUtil.getLocalAddress();
     private String brokerIP2 = RemotingUtil.getLocalAddress();
 
+    @ImportantField
+    private boolean recoverConcurrently = false;
+
     private int brokerPermission = PermName.PERM_READ | PermName.PERM_WRITE;
     private int defaultTopicQueueNums = 8;
     @ImportantField
@@ -77,6 +80,7 @@ public class BrokerConfig extends BrokerIdentity {
     private int consumerManageThreadPoolNums = 32;
     private int loadBalanceProcessorThreadPoolNums = 32;
     private int heartbeatThreadPoolNums = Math.min(32, PROCESSOR_NUMBER);
+    private int recoverThreadPoolNums = 32;
 
     /**
      * Thread numbers for EndTransactionProcessor
@@ -1323,4 +1327,20 @@ public class BrokerConfig extends BrokerIdentity {
     public void setSyncControllerMetadataPeriod(long syncControllerMetadataPeriod) {
         this.syncControllerMetadataPeriod = syncControllerMetadataPeriod;
     }
+
+    public boolean isRecoverConcurrently() {
+        return recoverConcurrently;
+    }
+
+    public void setRecoverConcurrently(boolean recoverConcurrently) {
+        this.recoverConcurrently = recoverConcurrently;
+    }
+
+    public int getRecoverThreadPoolNums() {
+        return recoverThreadPoolNums;
+    }
+
+    public void setRecoverThreadPoolNums(int recoverThreadPoolNums) {
+        this.recoverThreadPoolNums = recoverThreadPoolNums;
+    }
 }
diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
index 10049d54f..a40cbcd14 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
@@ -175,6 +175,7 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle {
         }
     }
 
+    @Override
     public long getTotalSize() {
         long totalSize = this.mappedFileQueue.getTotalFileSize();
         if (isExtReadEnable()) {
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 6f2ce3da8..72e6abb25 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -272,23 +272,23 @@ public class DefaultMessageStore implements MessageStore {
 
         try {
             boolean lastExitOK = !this.isTempFileExist();
-            LOGGER.info("last shutdown {}, root dir: {}", lastExitOK ? "normally" : "abnormally", messageStoreConfig.getStorePathRootDir());
+            LOGGER.info("last shutdown {}, store path root dir: {}",
+                lastExitOK ? "normally" : "abnormally", messageStoreConfig.getStorePathRootDir());
 
             // load Commit Log
-            result = result && this.commitLog.load();
+            result = this.commitLog.load();
 
             // load Consume Queue
             result = result && this.consumeQueueStore.load();
 
             if (result) {
                 this.storeCheckpoint =
-                    new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
+                    new StoreCheckpoint(
+                        StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
                 this.masterFlushedOffset = this.storeCheckpoint.getMasterFlushedOffset();
-                this.indexService.load(lastExitOK);
-
+                result = this.indexService.load(lastExitOK);
                 this.recover(lastExitOK);
-
-                LOGGER.info("load over, and the max phy offset = {}", this.getMaxPhyOffset());
+                LOGGER.info("message store recover end, and the max phy offset = {}", this.getMaxPhyOffset());
             }
 
             long maxOffset = this.getMaxPhyOffset();
@@ -645,7 +645,7 @@ public class DefaultMessageStore implements MessageStore {
         // truncate consume queue
         this.truncateDirtyLogicFiles(offsetToTruncate);
 
-        recoverTopicQueueTable();
+        this.recoverTopicQueueTable();
 
         this.reputMessageService = new ReputMessageService();
         this.reputMessageService.setReputFromOffset(Math.min(oldReputFromOffset, offsetToTruncate));
@@ -1630,21 +1630,31 @@ public class DefaultMessageStore implements MessageStore {
     }
 
     private void recover(final boolean lastExitOK) {
-        long recoverCqStart = System.currentTimeMillis();
-        long maxPhyOffsetOfConsumeQueue = this.recoverConsumeQueue();
-        long recoverCqEnd = System.currentTimeMillis();
+        boolean recoverConcurrently = this.brokerConfig.isRecoverConcurrently();
+        LOGGER.info("message store recover mode: {}", recoverConcurrently ? "concurrent" : "normal");
 
+        // recover consume queue
+        long recoverConsumeQueueStart = System.currentTimeMillis();
+        this.recoverConsumeQueue();
+        long maxPhyOffsetOfConsumeQueue = this.getMaxOffsetInConsumeQueue();
+        long recoverConsumeQueueEnd = System.currentTimeMillis();
+
+        // recover commitlog
         if (lastExitOK) {
             this.commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue);
         } else {
             this.commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue);
         }
-        long recoverClogEnd = System.currentTimeMillis();
+
+        // recover consume offset table
+        long recoverCommitLogEnd = System.currentTimeMillis();
         this.recoverTopicQueueTable();
-        long recoverOffsetEnd = System.currentTimeMillis();
+        long recoverConsumeOffsetEnd = System.currentTimeMillis();
 
-        LOGGER.info("Recover end total:{} recoverCq:{} recoverClog:{} recoverOffset:{}",
-            recoverOffsetEnd - recoverCqStart, recoverCqEnd - recoverCqStart, recoverClogEnd - recoverCqEnd, recoverOffsetEnd - recoverClogEnd);
+        LOGGER.info("message store recover total cost: {} ms, " +
+                "recoverConsumeQueue: {} ms, recoverCommitLog: {} ms, recoverOffsetTable: {} ms",
+            recoverConsumeOffsetEnd - recoverConsumeQueueStart, recoverConsumeQueueEnd - recoverConsumeQueueStart,
+            recoverCommitLogEnd - recoverConsumeQueueEnd, recoverConsumeOffsetEnd - recoverCommitLogEnd);
     }
 
     @Override
@@ -1666,8 +1676,16 @@ public class DefaultMessageStore implements MessageStore {
         return transientStorePool;
     }
 
-    private long recoverConsumeQueue() {
-        return this.consumeQueueStore.recover();
+    private void recoverConsumeQueue() {
+        if (!this.brokerConfig.isRecoverConcurrently()) {
+            this.consumeQueueStore.recover();
+        } else {
+            this.consumeQueueStore.recoverConcurrently();
+        }
+    }
+
+    private long getMaxOffsetInConsumeQueue() {
+        return this.consumeQueueStore.getMaxOffsetInConsumeQueue();
     }
 
     public void recoverTopicQueueTable() {
diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
index 48cef5092..73c318b43 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
@@ -17,6 +17,16 @@
 package org.apache.rocketmq.store.queue;
 
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.attribute.CQType;
 import org.apache.rocketmq.common.constant.LoggerName;
@@ -172,22 +182,85 @@ public class ConsumeQueueStore {
         }
     }
 
+    private ExecutorService buildExecutorService(BlockingQueue<Runnable> blockingQueue, String threadNamePrefix) {
+        return new ThreadPoolExecutor(
+            this.messageStore.getBrokerConfig().getRecoverThreadPoolNums(),
+            this.messageStore.getBrokerConfig().getRecoverThreadPoolNums(),
+            1000 * 60,
+            TimeUnit.MILLISECONDS,
+            blockingQueue,
+            new ThreadFactoryImpl(threadNamePrefix));
+    }
+
     public void recover(ConsumeQueueInterface consumeQueue) {
         FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId());
         fileQueueLifeCycle.recover();
     }
 
-    public long recover() {
-        long maxPhysicOffset = -1;
+    public void recover() {
         for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : this.consumeQueueTable.values()) {
             for (ConsumeQueueInterface logic : maps.values()) {
                 this.recover(logic);
+            }
+        }
+    }
+
+    public boolean recoverConcurrently() {
+        int count = 0;
+        for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : this.consumeQueueTable.values()) {
+            count += maps.values().size();
+        }
+        final CountDownLatch countDownLatch = new CountDownLatch(count);
+        BlockingQueue<Runnable> recoverQueue = new LinkedBlockingQueue<>();
+        final ExecutorService executor = buildExecutorService(recoverQueue, "RecoverConsumeQueueThread_");
+        List<FutureTask<Boolean>> result = new ArrayList<>(count);
+        try {
+            for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : this.consumeQueueTable.values()) {
+                for (final ConsumeQueueInterface logic : maps.values()) {
+                    FutureTask<Boolean> futureTask = new FutureTask<>(() -> {
+                        boolean ret = true;
+                        try {
+                            ((FileQueueLifeCycle) logic).recover();
+                        } catch (Throwable e) {
+                            ret = false;
+                            log.error("Exception occurs while recover consume queue concurrently, " +
+                                "topic={}, queueId={}", logic.getTopic(), logic.getQueueId(), e);
+                        } finally {
+                            countDownLatch.countDown();
+                        }
+                        return ret;
+                    });
+
+                    result.add(futureTask);
+                    executor.submit(futureTask);
+                }
+            }
+            countDownLatch.await();
+            for (FutureTask<Boolean> task : result) {
+                if (task != null && task.isDone()) {
+                    if (!task.get()) {
+                        return false;
+                    }
+                }
+            }
+        } catch (Exception e) {
+            log.error("Exception occurs while recover consume queue concurrently", e);
+            return false;
+        } finally {
+            executor.shutdown();
+        }
+        return true;
+    }
+
+    public long getMaxOffsetInConsumeQueue() {
+        long maxPhysicOffset = -1L;
+        for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : this.consumeQueueTable.values()) {
+            for (ConsumeQueueInterface logic : maps.values()) {
                 if (logic.getMaxPhysicOffset() > maxPhysicOffset) {
                     maxPhysicOffset = logic.getMaxPhysicOffset();
                 }
             }
         }
-
         return maxPhysicOffset;
     }