You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/09/29 13:03:33 UTC
[rocketmq] branch develop updated: [ISSUE #5162] Fix bug about DefaultMessageStore maxFilterMessageCount calculating (#5171)
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 368d12516 [ISSUE #5162] Fix bug about DefaultMessageStore maxFilterMessageCount calculating (#5171)
368d12516 is described below
commit 368d12516d102fa0161e2f2c4248c73d2d221024
Author: Nocturne <35...@users.noreply.github.com>
AuthorDate: Thu Sep 29 21:03:10 2022 +0800
[ISSUE #5162] Fix bug about DefaultMessageStore maxFilterMessageCount calculating (#5171)
* fix bug for comparing size with count
* add getUnitSize function
---
store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java | 5 +++++
.../main/java/org/apache/rocketmq/store/DefaultMessageStore.java | 4 ++--
.../java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java | 5 +++++
.../java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java | 6 ++++++
4 files changed, 18 insertions(+), 2 deletions(-)
diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
index a40cbcd14..59231dcf4 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
@@ -184,6 +184,11 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle {
return totalSize;
}
+ @Override
+ public int getUnitSize() {
+ return CQ_STORE_UNIT_SIZE;
+ }
+
@Override
public long getOffsetInQueueByTime(final long timestamp) {
MappedFile mappedFile = this.mappedFileQueue.getMappedFileByTime(timestamp);
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 72e6abb25..628444331 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -729,7 +729,7 @@ public class DefaultMessageStore implements MessageStore {
status = GetMessageStatus.OFFSET_OVERFLOW_BADLY;
nextBeginOffset = nextOffsetCorrection(offset, maxOffset);
} else {
- final int maxFilterMessageCount = Math.max(16000, maxMsgNums * ConsumeQueue.CQ_STORE_UNIT_SIZE);
+ final int maxFilterMessageSize = Math.max(16000, maxMsgNums * consumeQueue.getUnitSize());
final boolean diskFallRecorded = this.messageStoreConfig.isDiskFallRecorded();
long maxPullSize = Math.max(maxTotalMsgSize, 100);
@@ -764,7 +764,7 @@ public class DefaultMessageStore implements MessageStore {
boolean isInDisk = checkInDiskByCommitOffset(offsetPy, maxOffsetPy);
- if (cqUnit.getQueueOffset() - offset > maxFilterMessageCount) {
+ if ((cqUnit.getQueueOffset() - offset) * consumeQueue.getUnitSize() > maxFilterMessageSize) {
break;
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
index 4cd0088b1..99bfa552c 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
@@ -926,6 +926,11 @@ public class BatchConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCy
return this.mappedFileQueue.getTotalFileSize();
}
+ @Override
+ public int getUnitSize() {
+ return CQ_STORE_UNIT_SIZE;
+ }
+
@Override
public void destroy() {
this.maxMsgPhyOffsetInCommitLog = -1;
diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java
index c45592542..f36dda094 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java
@@ -120,6 +120,12 @@ public interface ConsumeQueueInterface {
*/
long getTotalSize();
+ /**
+ * Get the unit size of this CQ which is different in different CQ impl
+ * @return cq unit size
+ */
+ int getUnitSize();
+
/**
* Correct min offset by min commit log offset.
* @param minCommitLogOffset min commit log offset