You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/09/20 01:36:44 UTC
[rocketmq] branch develop updated: [ISSUE #5091] Speed up broker initialization by concurrently loading ConsumeQueue (#5093)
This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new b7bec238b [ISSUE #5091] Speed up broker initialization by concurrently loading ConsumeQueue (#5093)
b7bec238b is described below
commit b7bec238b9f2c5e7f97d469fb3ede39f89f324a8
Author: lizhimins <70...@qq.com>
AuthorDate: Tue Sep 20 09:36:18 2022 +0800
[ISSUE #5091] Speed up broker initialization by concurrently loading ConsumeQueue (#5093)
---
.../org/apache/rocketmq/common/BrokerConfig.java | 20 ++++++
.../org/apache/rocketmq/store/ConsumeQueue.java | 1 +
.../apache/rocketmq/store/DefaultMessageStore.java | 52 +++++++++-----
.../rocketmq/store/queue/ConsumeQueueStore.java | 79 +++++++++++++++++++++-
4 files changed, 132 insertions(+), 20 deletions(-)
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index f82f6a1de..4c7b53143 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -44,6 +44,9 @@ public class BrokerConfig extends BrokerIdentity {
private String brokerIP1 = RemotingUtil.getLocalAddress();
private String brokerIP2 = RemotingUtil.getLocalAddress();
+ @ImportantField
+ private boolean recoverConcurrently = false;
+
private int brokerPermission = PermName.PERM_READ | PermName.PERM_WRITE;
private int defaultTopicQueueNums = 8;
@ImportantField
@@ -77,6 +80,7 @@ public class BrokerConfig extends BrokerIdentity {
private int consumerManageThreadPoolNums = 32;
private int loadBalanceProcessorThreadPoolNums = 32;
private int heartbeatThreadPoolNums = Math.min(32, PROCESSOR_NUMBER);
+ private int recoverThreadPoolNums = 32;
/**
* Thread numbers for EndTransactionProcessor
@@ -1323,4 +1327,20 @@ public class BrokerConfig extends BrokerIdentity {
public void setSyncControllerMetadataPeriod(long syncControllerMetadataPeriod) {
this.syncControllerMetadataPeriod = syncControllerMetadataPeriod;
}
+
+ public boolean isRecoverConcurrently() {
+ return recoverConcurrently;
+ }
+
+ public void setRecoverConcurrently(boolean recoverConcurrently) {
+ this.recoverConcurrently = recoverConcurrently;
+ }
+
+ public int getRecoverThreadPoolNums() {
+ return recoverThreadPoolNums;
+ }
+
+ public void setRecoverThreadPoolNums(int recoverThreadPoolNums) {
+ this.recoverThreadPoolNums = recoverThreadPoolNums;
+ }
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
index 10049d54f..a40cbcd14 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
@@ -175,6 +175,7 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle {
}
}
+ @Override
public long getTotalSize() {
long totalSize = this.mappedFileQueue.getTotalFileSize();
if (isExtReadEnable()) {
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 6f2ce3da8..72e6abb25 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -272,23 +272,23 @@ public class DefaultMessageStore implements MessageStore {
try {
boolean lastExitOK = !this.isTempFileExist();
- LOGGER.info("last shutdown {}, root dir: {}", lastExitOK ? "normally" : "abnormally", messageStoreConfig.getStorePathRootDir());
+ LOGGER.info("last shutdown {}, store path root dir: {}",
+ lastExitOK ? "normally" : "abnormally", messageStoreConfig.getStorePathRootDir());
// load Commit Log
- result = result && this.commitLog.load();
+ result = this.commitLog.load();
// load Consume Queue
result = result && this.consumeQueueStore.load();
if (result) {
this.storeCheckpoint =
- new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
+ new StoreCheckpoint(
+ StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
this.masterFlushedOffset = this.storeCheckpoint.getMasterFlushedOffset();
- this.indexService.load(lastExitOK);
-
+ result = this.indexService.load(lastExitOK);
this.recover(lastExitOK);
-
- LOGGER.info("load over, and the max phy offset = {}", this.getMaxPhyOffset());
+ LOGGER.info("message store recover end, and the max phy offset = {}", this.getMaxPhyOffset());
}
long maxOffset = this.getMaxPhyOffset();
@@ -645,7 +645,7 @@ public class DefaultMessageStore implements MessageStore {
// truncate consume queue
this.truncateDirtyLogicFiles(offsetToTruncate);
- recoverTopicQueueTable();
+ this.recoverTopicQueueTable();
this.reputMessageService = new ReputMessageService();
this.reputMessageService.setReputFromOffset(Math.min(oldReputFromOffset, offsetToTruncate));
@@ -1630,21 +1630,31 @@ public class DefaultMessageStore implements MessageStore {
}
private void recover(final boolean lastExitOK) {
- long recoverCqStart = System.currentTimeMillis();
- long maxPhyOffsetOfConsumeQueue = this.recoverConsumeQueue();
- long recoverCqEnd = System.currentTimeMillis();
+ boolean recoverConcurrently = this.brokerConfig.isRecoverConcurrently();
+ LOGGER.info("message store recover mode: {}", recoverConcurrently ? "concurrent" : "normal");
+ // recover consume queue
+ long recoverConsumeQueueStart = System.currentTimeMillis();
+ this.recoverConsumeQueue();
+ long maxPhyOffsetOfConsumeQueue = this.getMaxOffsetInConsumeQueue();
+ long recoverConsumeQueueEnd = System.currentTimeMillis();
+
+ // recover commitlog
if (lastExitOK) {
this.commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue);
} else {
this.commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue);
}
- long recoverClogEnd = System.currentTimeMillis();
+
+ // recover consume offset table
+ long recoverCommitLogEnd = System.currentTimeMillis();
this.recoverTopicQueueTable();
- long recoverOffsetEnd = System.currentTimeMillis();
+ long recoverConsumeOffsetEnd = System.currentTimeMillis();
- LOGGER.info("Recover end total:{} recoverCq:{} recoverClog:{} recoverOffset:{}",
- recoverOffsetEnd - recoverCqStart, recoverCqEnd - recoverCqStart, recoverClogEnd - recoverCqEnd, recoverOffsetEnd - recoverClogEnd);
+ LOGGER.info("message store recover total cost: {} ms, " +
+ "recoverConsumeQueue: {} ms, recoverCommitLog: {} ms, recoverOffsetTable: {} ms",
+ recoverConsumeOffsetEnd - recoverConsumeQueueStart, recoverConsumeQueueEnd - recoverConsumeQueueStart,
+ recoverCommitLogEnd - recoverConsumeQueueEnd, recoverConsumeOffsetEnd - recoverCommitLogEnd);
}
@Override
@@ -1666,8 +1676,16 @@ public class DefaultMessageStore implements MessageStore {
return transientStorePool;
}
- private long recoverConsumeQueue() {
- return this.consumeQueueStore.recover();
+ private void recoverConsumeQueue() {
+ if (!this.brokerConfig.isRecoverConcurrently()) {
+ this.consumeQueueStore.recover();
+ } else {
+ this.consumeQueueStore.recoverConcurrently();
+ }
+ }
+
+ private long getMaxOffsetInConsumeQueue() {
+ return this.consumeQueueStore.getMaxOffsetInConsumeQueue();
}
public void recoverTopicQueueTable() {
diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
index 48cef5092..73c318b43 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
@@ -17,6 +17,16 @@
package org.apache.rocketmq.store.queue;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.attribute.CQType;
import org.apache.rocketmq.common.constant.LoggerName;
@@ -172,22 +182,85 @@ public class ConsumeQueueStore {
}
}
+ private ExecutorService buildExecutorService(BlockingQueue<Runnable> blockingQueue, String threadNamePrefix) {
+ return new ThreadPoolExecutor(
+ this.messageStore.getBrokerConfig().getRecoverThreadPoolNums(),
+ this.messageStore.getBrokerConfig().getRecoverThreadPoolNums(),
+ 1000 * 60,
+ TimeUnit.MILLISECONDS,
+ blockingQueue,
+ new ThreadFactoryImpl(threadNamePrefix));
+ }
+
public void recover(ConsumeQueueInterface consumeQueue) {
FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId());
fileQueueLifeCycle.recover();
}
- public long recover() {
- long maxPhysicOffset = -1;
+ public void recover() {
for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : this.consumeQueueTable.values()) {
for (ConsumeQueueInterface logic : maps.values()) {
this.recover(logic);
+ }
+ }
+ }
+
+ public boolean recoverConcurrently() {
+ int count = 0;
+ for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : this.consumeQueueTable.values()) {
+ count += maps.values().size();
+ }
+ final CountDownLatch countDownLatch = new CountDownLatch(count);
+ BlockingQueue<Runnable> recoverQueue = new LinkedBlockingQueue<>();
+ final ExecutorService executor = buildExecutorService(recoverQueue, "RecoverConsumeQueueThread_");
+ List<FutureTask<Boolean>> result = new ArrayList<>(count);
+ try {
+ for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : this.consumeQueueTable.values()) {
+ for (final ConsumeQueueInterface logic : maps.values()) {
+ FutureTask<Boolean> futureTask = new FutureTask<>(() -> {
+ boolean ret = true;
+ try {
+ ((FileQueueLifeCycle) logic).recover();
+ } catch (Throwable e) {
+ ret = false;
+ log.error("Exception occurs while recover consume queue concurrently, " +
+ "topic={}, queueId={}", logic.getTopic(), logic.getQueueId(), e);
+ } finally {
+ countDownLatch.countDown();
+ }
+ return ret;
+ });
+
+ result.add(futureTask);
+ executor.submit(futureTask);
+ }
+ }
+ countDownLatch.await();
+ for (FutureTask<Boolean> task : result) {
+ if (task != null && task.isDone()) {
+ if (!task.get()) {
+ return false;
+ }
+ }
+ }
+ } catch (Exception e) {
+ log.error("Exception occurs while recover consume queue concurrently", e);
+ return false;
+ } finally {
+ executor.shutdown();
+ }
+ return true;
+ }
+
+ public long getMaxOffsetInConsumeQueue() {
+ long maxPhysicOffset = -1L;
+ for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : this.consumeQueueTable.values()) {
+ for (ConsumeQueueInterface logic : maps.values()) {
if (logic.getMaxPhysicOffset() > maxPhysicOffset) {
maxPhysicOffset = logic.getMaxPhysicOffset();
}
}
}
-
return maxPhysicOffset;
}