You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/06/30 01:07:49 UTC
[rocketmq] branch 5.0.0-beta updated: [ISSUE #4435] Code optimization for ConsumeQueue abstraction. (#4439)
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch 5.0.0-beta
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-beta by this push:
new 05fee0d77 [ISSUE #4435] Code optimization for ConsumeQueue abstraction. (#4439)
05fee0d77 is described below
commit 05fee0d773d971cc56aafbbb95955f44fb963bed
Author: Hongjian Fei <er...@163.com>
AuthorDate: Thu Jun 30 09:07:30 2022 +0800
[ISSUE #4435] Code optimization for ConsumeQueue abstraction. (#4439)
* Delay the start of some heavy components.
* Remove the code for rechecking reput position.
* Make ConsumeQueue and BatchConsumeQueue program to an interface: MessageStore, not an implementation: DefaultMessageStore.
* Transfer methods that should not belong to FileQueueLifeCycle to ConsumeQueueInterface.
* Configurable method doCheckReputOffsetFromCq.
* Fix naming issues.
---
.../org/apache/rocketmq/store/ConsumeQueue.java | 50 ++++-----
.../apache/rocketmq/store/DefaultMessageStore.java | 123 ++++++++++++---------
.../org/apache/rocketmq/store/MessageStore.java | 7 ++
.../rocketmq/store/queue/BatchConsumeQueue.java | 26 ++---
.../store/queue/ConsumeQueueInterface.java | 22 ++++
.../rocketmq/store/queue/ConsumeQueueStore.java | 10 +-
.../rocketmq/store/queue/FileQueueLifeCycle.java | 59 +++++++++-
.../java/org/apache/rocketmq/store/HATest.java | 2 +-
.../store/dledger/MessageStoreTestBase.java | 2 +
9 files changed, 197 insertions(+), 104 deletions(-)
diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
index 740377dc4..1e59dd92e 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
@@ -47,7 +47,7 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle {
public static final int CQ_STORE_UNIT_SIZE = 20;
private static final InternalLogger LOG_ERROR = InternalLoggerFactory.getLogger(LoggerName.STORE_ERROR_LOGGER_NAME);
- private final DefaultMessageStore defaultMessageStore;
+ private final MessageStore messageStore;
private final MappedFileQueue mappedFileQueue;
private final String topic;
@@ -65,10 +65,10 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle {
final int queueId,
final String storePath,
final int mappedFileSize,
- final DefaultMessageStore defaultMessageStore) {
+ final MessageStore messageStore) {
this.storePath = storePath;
this.mappedFileSize = mappedFileSize;
- this.defaultMessageStore = defaultMessageStore;
+ this.messageStore = messageStore;
this.topic = topic;
this.queueId = queueId;
@@ -81,13 +81,13 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle {
this.byteBufferIndex = ByteBuffer.allocate(CQ_STORE_UNIT_SIZE);
- if (defaultMessageStore.getMessageStoreConfig().isEnableConsumeQueueExt()) {
+ if (messageStore.getMessageStoreConfig().isEnableConsumeQueueExt()) {
this.consumeQueueExt = new ConsumeQueueExt(
topic,
queueId,
- StorePathConfigHelper.getStorePathConsumeQueueExt(defaultMessageStore.getMessageStoreConfig().getStorePathRootDir()),
- defaultMessageStore.getMessageStoreConfig().getMappedFileSizeConsumeQueueExt(),
- defaultMessageStore.getMessageStoreConfig().getBitMapLengthConsumeQueueExt()
+ StorePathConfigHelper.getStorePathConsumeQueueExt(messageStore.getMessageStoreConfig().getStorePathRootDir()),
+ messageStore.getMessageStoreConfig().getMappedFileSizeConsumeQueueExt(),
+ messageStore.getMessageStoreConfig().getBitMapLengthConsumeQueueExt()
);
}
}
@@ -188,7 +188,7 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle {
int high = 0;
int midOffset = -1, targetOffset = -1, leftOffset = -1, rightOffset = -1;
long leftIndexValue = -1L, rightIndexValue = -1L;
- long minPhysicOffset = this.defaultMessageStore.getMinPhyOffset();
+ long minPhysicOffset = this.messageStore.getMinPhyOffset();
SelectMappedBufferResult sbr = mappedFile.selectMappedBuffer(0);
if (null != sbr) {
ByteBuffer byteBuffer = sbr.getByteBuffer();
@@ -206,7 +206,7 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle {
}
long storeTime =
- this.defaultMessageStore.getCommitLog().pickupStoreTimestamp(phyOffset, size);
+ this.messageStore.getCommitLog().pickupStoreTimestamp(phyOffset, size);
if (storeTime < 0) {
return 0;
} else if (storeTime == timestamp) {
@@ -427,7 +427,7 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle {
@Override
public void putMessagePositionInfoWrapper(DispatchRequest request) {
final int maxRetries = 30;
- boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable();
+ boolean canWrite = this.messageStore.getRunningFlags().isCQWriteable();
for (int i = 0; i < maxRetries && canWrite; i++) {
long tagsCode = request.getTagsCode();
if (isExtWriteEnable()) {
@@ -447,11 +447,11 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle {
boolean result = this.putMessagePositionInfo(request.getCommitLogOffset(),
request.getMsgSize(), tagsCode, request.getConsumeQueueOffset());
if (result) {
- if (this.defaultMessageStore.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE ||
- this.defaultMessageStore.getMessageStoreConfig().isEnableDLegerCommitLog()) {
- this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(request.getStoreTimestamp());
+ if (this.messageStore.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE ||
+ this.messageStore.getMessageStoreConfig().isEnableDLegerCommitLog()) {
+ this.messageStore.getStoreCheckpoint().setPhysicMsgTimestamp(request.getStoreTimestamp());
}
- this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp());
+ this.messageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp());
if (checkMultiDispatchQueue(request)) {
multiDispatchLmqQueue(request, maxRetries);
}
@@ -471,11 +471,11 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle {
// XXX: warn and notify me
log.error("[BUG]consume queue can not write, {} {}", this.topic, this.queueId);
- this.defaultMessageStore.getRunningFlags().makeLogicsQueueError();
+ this.messageStore.getRunningFlags().makeLogicsQueueError();
}
private boolean checkMultiDispatchQueue(DispatchRequest dispatchRequest) {
- if (!this.defaultMessageStore.getMessageStoreConfig().isEnableMultiDispatch()) {
+ if (!this.messageStore.getMessageStoreConfig().isEnableMultiDispatch()) {
return false;
}
Map<String, String> prop = dispatchRequest.getPropertiesMap();
@@ -504,7 +504,7 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle {
String queueName = queues[i];
long queueOffset = Long.parseLong(queueOffsets[i]);
int queueId = request.getQueueId();
- if (this.defaultMessageStore.getMessageStoreConfig().isEnableLmq() && MixAll.isLmq(queueName)) {
+ if (this.messageStore.getMessageStoreConfig().isEnableLmq() && MixAll.isLmq(queueName)) {
queueId = 0;
}
doDispatchLmqQueue(request, maxRetries, queueName, queueOffset, queueId);
@@ -515,8 +515,8 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle {
private void doDispatchLmqQueue(DispatchRequest request, int maxRetries, String queueName, long queueOffset,
int queueId) {
- ConsumeQueueInterface cq = this.defaultMessageStore.findConsumeQueue(queueName, queueId);
- boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable();
+ ConsumeQueueInterface cq = this.messageStore.findConsumeQueue(queueName, queueId);
+ boolean canWrite = this.messageStore.getRunningFlags().isCQWriteable();
for (int i = 0; i < maxRetries && canWrite; i++) {
boolean result = ((ConsumeQueue) cq).putMessagePositionInfo(request.getCommitLogOffset(), request.getMsgSize(),
request.getTagsCode(),
@@ -543,7 +543,7 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle {
long queueOffset = queueOffsetAssigner.assignQueueOffset(topicQueueKey, messageNum);
msg.setQueueOffset(queueOffset);
// For LMQ
- if (!defaultMessageStore.getMessageStoreConfig().isEnableMultiDispatch()) {
+ if (!messageStore.getMessageStoreConfig().isEnableMultiDispatch()) {
return;
}
String multiDispatchQueue = msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH);
@@ -554,7 +554,7 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle {
Long[] queueOffsets = new Long[queues.length];
for (int i = 0; i < queues.length; i++) {
String key = queueKey(queues[i], msg);
- if (defaultMessageStore.getMessageStoreConfig().isEnableLmq() && MixAll.isLmq(key)) {
+ if (messageStore.getMessageStoreConfig().isEnableLmq() && MixAll.isLmq(key)) {
queueOffsets[i] = queueOffsetAssigner.assignLmqOffset(key, (short) 1);
}
}
@@ -568,7 +568,7 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle {
keyBuilder.append(queueName);
keyBuilder.append('-');
int queueId = msgInner.getQueueId();
- if (defaultMessageStore.getMessageStoreConfig().isEnableLmq() && MixAll.isLmq(queueName)) {
+ if (messageStore.getMessageStoreConfig().isEnableLmq() && MixAll.isLmq(queueName)) {
queueId = 0;
}
keyBuilder.append(queueId);
@@ -809,10 +809,10 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle {
}
@Override
- public long rollNextFile(final long index) {
+ public long rollNextFile(final long nextBeginOffset) {
int mappedFileSize = this.mappedFileSize;
int totalUnitsInFile = mappedFileSize / CQ_STORE_UNIT_SIZE;
- return index + totalUnitsInFile - index % totalUnitsInFile;
+ return nextBeginOffset + totalUnitsInFile - nextBeginOffset % totalUnitsInFile;
}
@Override
@@ -873,7 +873,7 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle {
protected boolean isExtWriteEnable() {
return this.consumeQueueExt != null
- && this.defaultMessageStore.getMessageStoreConfig().isEnableConsumeQueueExt();
+ && this.messageStore.getMessageStoreConfig().isEnableConsumeQueueExt();
}
/**
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index bcb826e53..0da12a232 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -193,21 +193,12 @@ public class DefaultMessageStore implements MessageStore {
this.haService = new DefaultHAService();
LOGGER.warn("Load default HA Service: {}", DefaultHAService.class.getSimpleName());
}
- this.haService.init(this);
}
this.reputMessageService = new ReputMessageService();
this.transientStorePool = new TransientStorePool(messageStoreConfig);
- if (messageStoreConfig.isTransientStorePoolEnable()) {
- this.transientStorePool.init();
- }
-
- this.allocateMappedFileService.start();
-
- this.indexService.start();
-
this.scheduledExecutorService =
Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("StoreScheduledThread", getBrokerIdentity()));
@@ -309,6 +300,17 @@ public class DefaultMessageStore implements MessageStore {
*/
@Override
public void start() throws Exception {
+ if (!messageStoreConfig.isEnableDLegerCommitLog() && !this.messageStoreConfig.isDuplicationEnable()) {
+ this.haService.init(this);
+ }
+
+ if (messageStoreConfig.isTransientStorePoolEnable()) {
+ this.transientStorePool.init();
+ }
+
+ this.allocateMappedFileService.start();
+
+ this.indexService.start();
lock = lockFile.getChannel().tryLock(0, 1, false);
if (lock == null || lock.isShared() || !lock.isValid()) {
@@ -321,53 +323,14 @@ public class DefaultMessageStore implements MessageStore {
if (this.getMessageStoreConfig().isDuplicationEnable()) {
this.reputMessageService.setReputFromOffset(this.commitLog.getConfirmOffset());
} else {
- /**
- * 1. Make sure the fast-forward messages to be truncated during the recovering according to the max physical offset of the commitlog;
- * 2. DLedger committedPos may be missing, so the maxPhysicalPosInLogicQueue maybe bigger that maxOffset returned by DLedgerCommitLog, just let it go;
- * 3. Calculate the reput offset according to the consume queue;
- * 4. Make sure the fall-behind messages to be dispatched before starting the commitlog, especially when the broker role are automatically changed.
- */
- long maxPhysicalPosInLogicQueue = commitLog.getMinOffset();
- for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : this.getConsumeQueueTable().values()) {
- for (ConsumeQueueInterface logic : maps.values()) {
- if (logic.getMaxPhysicOffset() > maxPhysicalPosInLogicQueue) {
- maxPhysicalPosInLogicQueue = logic.getMaxPhysicOffset();
- }
- }
- }
- if (maxPhysicalPosInLogicQueue < 0) {
- maxPhysicalPosInLogicQueue = 0;
- }
- if (maxPhysicalPosInLogicQueue < this.commitLog.getMinOffset()) {
- maxPhysicalPosInLogicQueue = this.commitLog.getMinOffset();
- /**
- * This happens in following conditions:
- * 1. If someone removes all the consumequeue files or the disk get damaged.
- * 2. Launch a new broker, and copy the commitlog from other brokers.
- *
- * All the conditions has the same in common that the maxPhysicalPosInLogicQueue should be 0.
- * If the maxPhysicalPosInLogicQueue is gt 0, there maybe something wrong.
- */
- LOGGER.warn("[TooSmallCqOffset] maxPhysicalPosInLogicQueue={} clMinOffset={}", maxPhysicalPosInLogicQueue, this.commitLog.getMinOffset());
- }
- LOGGER.info("[SetReputOffset] maxPhysicalPosInLogicQueue={} clMinOffset={} clMaxOffset={} clConfirmedOffset={}",
- maxPhysicalPosInLogicQueue, this.commitLog.getMinOffset(), this.commitLog.getMaxOffset(), this.commitLog.getConfirmOffset());
- this.reputMessageService.setReputFromOffset(maxPhysicalPosInLogicQueue);
+ // It is [recover]'s responsibility to fully dispatch the commit log data before the max offset of commit log.
+ this.reputMessageService.setReputFromOffset(this.commitLog.getMaxOffset());
}
this.reputMessageService.start();
- /**
- * 1. Finish dispatching the messages fall behind, then to start other services.
- * 2. DLedger committedPos may be missing, so here just require dispatchBehindBytes <= 0
- */
- while (true) {
- if (dispatchBehindBytes() <= 0) {
- break;
- }
- Thread.sleep(1000);
- LOGGER.info("Try to finish doing reput the messages fall behind during the starting, reputOffset={} maxOffset={} behind={}", this.reputMessageService.getReputFromOffset(), this.getMaxPhyOffset(), this.dispatchBehindBytes());
- }
- this.recoverTopicQueueTable();
+ // Checking is not necessary, as long as the dLedger's implementation exactly follows the definition of Recover,
+ // which is eliminating the dispatch inconsistency between the commitLog and consumeQueue at the end of recovery.
+ this.doRecheckReputOffsetFromCq();
this.flushConsumeQueueService.start();
this.commitLog.start();
@@ -383,6 +346,59 @@ public class DefaultMessageStore implements MessageStore {
this.shutdown = false;
}
+ private void doRecheckReputOffsetFromCq() throws InterruptedException {
+ if (!messageStoreConfig.isRecheckReputOffsetFromCq()) {
+ return;
+ }
+
+ /**
+ * 1. Make sure the fast-forward messages to be truncated during the recovering according to the max physical offset of the commitlog;
+ * 2. DLedger committedPos may be missing, so the maxPhysicalPosInLogicQueue maybe bigger that maxOffset returned by DLedgerCommitLog, just let it go;
+ * 3. Calculate the reput offset according to the consume queue;
+ * 4. Make sure the fall-behind messages to be dispatched before starting the commitlog, especially when the broker role are automatically changed.
+ */
+ long maxPhysicalPosInLogicQueue = commitLog.getMinOffset();
+ for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : this.getConsumeQueueTable().values()) {
+ for (ConsumeQueueInterface logic : maps.values()) {
+ if (logic.getMaxPhysicOffset() > maxPhysicalPosInLogicQueue) {
+ maxPhysicalPosInLogicQueue = logic.getMaxPhysicOffset();
+ }
+ }
+ }
+ // If maxPhyPos(CQs) < minPhyPos(CommitLog), some newly deleted topics may be re-dispatched into cqs mistakenly.
+ if (maxPhysicalPosInLogicQueue < 0) {
+ maxPhysicalPosInLogicQueue = 0;
+ }
+ if (maxPhysicalPosInLogicQueue < this.commitLog.getMinOffset()) {
+ maxPhysicalPosInLogicQueue = this.commitLog.getMinOffset();
+ /**
+ * This happens in following conditions:
+ * 1. If someone removes all the consumequeue files or the disk get damaged.
+ * 2. Launch a new broker, and copy the commitlog from other brokers.
+ *
+ * All the conditions has the same in common that the maxPhysicalPosInLogicQueue should be 0.
+ * If the maxPhysicalPosInLogicQueue is gt 0, there maybe something wrong.
+ */
+ LOGGER.warn("[TooSmallCqOffset] maxPhysicalPosInLogicQueue={} clMinOffset={}", maxPhysicalPosInLogicQueue, this.commitLog.getMinOffset());
+ }
+ LOGGER.info("[SetReputOffset] maxPhysicalPosInLogicQueue={} clMinOffset={} clMaxOffset={} clConfirmedOffset={}",
+ maxPhysicalPosInLogicQueue, this.commitLog.getMinOffset(), this.commitLog.getMaxOffset(), this.commitLog.getConfirmOffset());
+ this.reputMessageService.setReputFromOffset(maxPhysicalPosInLogicQueue);
+
+ /**
+ * 1. Finish dispatching the messages fall behind, then to start other services.
+ * 2. DLedger committedPos may be missing, so here just require dispatchBehindBytes <= 0
+ */
+ while (true) {
+ if (dispatchBehindBytes() <= 0) {
+ break;
+ }
+ Thread.sleep(1000);
+ LOGGER.info("Try to finish doing reput the messages fall behind during the starting, reputOffset={} maxOffset={} behind={}", this.reputMessageService.getReputFromOffset(), this.getMaxPhyOffset(), this.dispatchBehindBytes());
+ }
+ this.recoverTopicQueueTable();
+ }
+
@Override
public void shutdown() {
if (!this.shutdown) {
@@ -1459,6 +1475,7 @@ public class DefaultMessageStore implements MessageStore {
return null;
}
+ @Override
public ConsumeQueueInterface findConsumeQueue(String topic, int queueId) {
return this.consumeQueueStore.findOrCreateConsumeQueue(topic, queueId);
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
index 9dc27faff..94b618696 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
@@ -466,6 +466,13 @@ public interface MessageStore {
*/
ConsumeQueueInterface getConsumeQueue(String topic, int queueId);
+ /**
+ * Get consume queue of the topic/queue. If consume queue not exist, will create one then return it.
+ * @param topic Topic.
+ * @param queueId Queue ID.
+ * @return Consume queue.
+ */
+ ConsumeQueueInterface findConsumeQueue(String topic, int queueId);
/**
* Get BrokerStatsManager of the messageStore.
diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
index 3db3c0e7e..40bc76303 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
@@ -51,7 +51,7 @@ public class BatchConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCy
private static final int MSG_COMPACT_OFFSET_LENGTH = 4;
public static final int INVALID_POS = -1;
final MappedFileQueue mappedFileQueue;
- private MessageStore defaultMessageStore;
+ private final MessageStore messageStore;
private final String topic;
private final int queueId;
private final ByteBuffer byteBufferItem;
@@ -74,11 +74,11 @@ public class BatchConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCy
final int queueId,
final String storePath,
final int mappedFileSize,
- final MessageStore defaultMessageStore) {
+ final MessageStore messageStore) {
this.storePath = storePath;
this.mappedFileSize = mappedFileSize;
- this.defaultMessageStore = defaultMessageStore;
- this.commitLogSize = defaultMessageStore.getCommitLog().getCommitLogSize();
+ this.messageStore = messageStore;
+ this.commitLogSize = messageStore.getCommitLog().getCommitLogSize();
this.topic = topic;
this.queueId = queueId;
@@ -100,7 +100,7 @@ public class BatchConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCy
}
private void refreshCache() {
- if (!this.defaultMessageStore.getMessageStoreConfig().isSearchBcqByCacheEnable()) {
+ if (!this.messageStore.getMessageStoreConfig().isSearchBcqByCacheEnable()) {
return ;
}
ConcurrentSkipListMap<Long, MappedFile> newOffsetCache = new ConcurrentSkipListMap<>();
@@ -447,7 +447,7 @@ public class BatchConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCy
@Override
public void putMessagePositionInfoWrapper(DispatchRequest request) {
final int maxRetries = 30;
- boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable();
+ boolean canWrite = this.messageStore.getRunningFlags().isCQWriteable();
if (request.getMsgBaseOffset() < 0 || request.getBatchSize() < 0) {
log.warn("[NOTIFYME]unexpected dispacth request in batch consume queue topic:{} queue:{} offset:{}", topic, queueId, request.getCommitLogOffset());
return;
@@ -457,10 +457,10 @@ public class BatchConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCy
request.getMsgSize(), request.getTagsCode(),
request.getStoreTimestamp(), request.getMsgBaseOffset(), request.getBatchSize());
if (result) {
- if (BrokerRole.SLAVE == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
- this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(request.getStoreTimestamp());
+ if (BrokerRole.SLAVE == this.messageStore.getMessageStoreConfig().getBrokerRole()) {
+ this.messageStore.getStoreCheckpoint().setPhysicMsgTimestamp(request.getStoreTimestamp());
}
- this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp());
+ this.messageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp());
return;
} else {
// XXX: warn and notify me
@@ -476,7 +476,7 @@ public class BatchConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCy
}
// XXX: warn and notify me
log.error("[NOTIFYME]batch consume queue can not write, {} {}", this.topic, this.queueId);
- this.defaultMessageStore.getRunningFlags().makeLogicsQueueError();
+ this.messageStore.getRunningFlags().makeLogicsQueueError();
}
@Override
@@ -594,7 +594,7 @@ public class BatchConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCy
targetBcq = lastBcq;
targetMinOffset = minForLastBcq;
} else {
- boolean searchBcqByCacheEnable = this.defaultMessageStore.getMessageStoreConfig().isSearchBcqByCacheEnable();
+ boolean searchBcqByCacheEnable = this.messageStore.getMessageStoreConfig().isSearchBcqByCacheEnable();
if (searchBcqByCacheEnable) {
// it's not the last BCQ file, so search it through cache.
targetBcq = this.searchOffsetFromCache(msgOffset);
@@ -675,7 +675,7 @@ public class BatchConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCy
targetBcq = lastBcq;
targetMinOffset = minForLastBcq;
} else {
- boolean searchBcqByCacheEnable = this.defaultMessageStore.getMessageStoreConfig().isSearchBcqByCacheEnable();
+ boolean searchBcqByCacheEnable = this.messageStore.getMessageStoreConfig().isSearchBcqByCacheEnable();
if (searchBcqByCacheEnable) {
// it's not the last BCQ file, so search it through cache.
targetBcq = this.searchTimeFromCache(timestamp);
@@ -941,7 +941,7 @@ public class BatchConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCy
}
@Override
- public long rollNextFile(long offset) {
+ public long rollNextFile(long nextBeginOffset) {
return 0;
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java
index 6d1b68c45..c45592542 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java
@@ -18,6 +18,8 @@
package org.apache.rocketmq.store.queue;
import org.apache.rocketmq.common.attribute.CQType;
+import org.apache.rocketmq.common.message.MessageExtBrokerInner;
+import org.apache.rocketmq.store.DispatchRequest;
public interface ConsumeQueueInterface {
/**
@@ -117,4 +119,24 @@ public interface ConsumeQueueInterface {
* @return total size
*/
long getTotalSize();
+
+ /**
+ * Correct min offset by min commit log offset.
+ * @param minCommitLogOffset min commit log offset
+ */
+ void correctMinOffset(long minCommitLogOffset);
+
+ /**
+ * Do dispatch.
+ * @param request the request containing dispatch information.
+ */
+ void putMessagePositionInfoWrapper(DispatchRequest request);
+
+ /**
+ * Assign queue offset.
+ * @param queueOffsetAssigner the delegated queue offset assigner
+ * @param msg message itself
+ * @param messageNum message number
+ */
+ void assignQueueOffset(QueueOffsetAssigner queueOffsetAssigner, MessageExtBrokerInner msg, short messageNum);
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
index beb053247..48cef5092 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
@@ -78,8 +78,7 @@ public class ConsumeQueueStore {
}
public void correctMinOffset(ConsumeQueueInterface consumeQueue, long minCommitLogOffset) {
- FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId());
- fileQueueLifeCycle.correctMinOffset(minCommitLogOffset);
+ consumeQueue.correctMinOffset(minCommitLogOffset);
}
/**
@@ -89,8 +88,7 @@ public class ConsumeQueueStore {
* @param request dispatch request
*/
public void putMessagePositionInfoWrapper(ConsumeQueueInterface consumeQueue, DispatchRequest request) {
- FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId());
- fileQueueLifeCycle.putMessagePositionInfoWrapper(request);
+ consumeQueue.putMessagePositionInfoWrapper(request);
}
public void putMessagePositionInfoWrapper(DispatchRequest dispatchRequest) {
@@ -315,8 +313,8 @@ public class ConsumeQueueStore {
}
public void assignQueueOffset(MessageExtBrokerInner msg, short messageNum) {
- FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(msg.getTopic(), msg.getQueueId());
- fileQueueLifeCycle.assignQueueOffset(this.queueOffsetAssigner, msg, messageNum);
+ ConsumeQueueInterface consumeQueue = findOrCreateConsumeQueue(msg.getTopic(), msg.getQueueId());
+ consumeQueue.assignQueueOffset(this.queueOffsetAssigner, msg, messageNum);
}
public void updateQueueOffset(String topic, int queueId, long offset) {
diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/FileQueueLifeCycle.java b/store/src/main/java/org/apache/rocketmq/store/queue/FileQueueLifeCycle.java
index b2211d8ad..95cc0887f 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/FileQueueLifeCycle.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/FileQueueLifeCycle.java
@@ -16,22 +16,69 @@
*/
package org.apache.rocketmq.store.queue;
-import org.apache.rocketmq.store.DispatchRequest;
-import org.apache.rocketmq.common.message.MessageExtBrokerInner;
import org.apache.rocketmq.store.Swappable;
+/**
+ * FileQueueLifeCycle contains life cycle methods of ConsumerQueue that is directly implemented by FILE.
+ */
public interface FileQueueLifeCycle extends Swappable {
+ /**
+ * Load from file.
+ * @return true if loaded successfully.
+ */
boolean load();
+
+ /**
+ * Recover from file.
+ */
void recover();
+
+ /**
+ * Check files.
+ */
void checkSelf();
+
+ /**
+ * Flush cache to file.
+ * @param flushLeastPages the minimum number of pages to be flushed
+ * @return true if any data has been flushed.
+ */
boolean flush(int flushLeastPages);
+
+ /**
+ * Destroy files.
+ */
void destroy();
+
+ /**
+ * Truncate dirty logic files starting at max commit log position.
+ * @param maxCommitLogPos max commit log position
+ */
void truncateDirtyLogicFiles(long maxCommitLogPos);
+
+ /**
+ * Delete expired files ending at min commit log position.
+ * @param minCommitLogPos min commit log position
+ * @return deleted file numbers.
+ */
int deleteExpiredFile(long minCommitLogPos);
- long rollNextFile(final long offset);
+
+ /**
+ * Roll to next file.
+ * @param nextBeginOffset next begin offset
+ * @return the beginning offset of the next file
+ */
+ long rollNextFile(final long nextBeginOffset);
+
+ /**
+ * Is the first file available?
+ * @return true if it's available
+ */
boolean isFirstFileAvailable();
+
+ /**
+ * Does the first file exist?
+ * @return true if it exists
+ */
boolean isFirstFileExist();
- void correctMinOffset(long minCommitLogOffset);
- void putMessagePositionInfoWrapper(DispatchRequest request);
- void assignQueueOffset(QueueOffsetAssigner queueOffsetAssigner, MessageExtBrokerInner msg, short messageNum);
}
diff --git a/store/src/test/java/org/apache/rocketmq/store/HATest.java b/store/src/test/java/org/apache/rocketmq/store/HATest.java
index 9ef960f50..bee9d6429 100644
--- a/store/src/test/java/org/apache/rocketmq/store/HATest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/HATest.java
@@ -85,11 +85,11 @@ public class HATest {
slaveMessageStore = buildMessageStore(slaveStoreConfig, 1L);
boolean load = messageStore.load();
boolean slaveLoad = slaveMessageStore.load();
- slaveMessageStore.updateHaMasterAddress("127.0.0.1:10912");
assertTrue(load);
assertTrue(slaveLoad);
messageStore.start();
slaveMessageStore.start();
+ slaveMessageStore.updateHaMasterAddress("127.0.0.1:10912");
Thread.sleep(6000L);//because the haClient will wait 5s after the first connectMaster failed,sleep 6s
}
diff --git a/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java b/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java
index f1bc996a8..3ae0cb64e 100644
--- a/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java
+++ b/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java
@@ -56,6 +56,8 @@ public class MessageStoreTestBase extends StoreTestBase {
storeConfig.setdLegerGroup(group);
storeConfig.setdLegerPeers(peers);
storeConfig.setdLegerSelfId(selfId);
+
+ storeConfig.setRecheckReputOffsetFromCq(true);
DefaultMessageStore defaultMessageStore = new DefaultMessageStore(storeConfig, new BrokerStatsManager("DLedgerCommitlogTest", true), (topic, queueId, logicOffset, tagsCode, msgStoreTime, filterBitMap, properties) -> {
}, new BrokerConfig());