You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/09/29 13:03:33 UTC

[rocketmq] branch develop updated: [ISSUE #5162] Fix bug about DefaultMessageStore maxFilterMessageCount calculating (#5171)

This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 368d12516 [ISSUE #5162] Fix bug about DefaultMessageStore maxFilterMessageCount calculating (#5171)
368d12516 is described below

commit 368d12516d102fa0161e2f2c4248c73d2d221024
Author: Nocturne <35...@users.noreply.github.com>
AuthorDate: Thu Sep 29 21:03:10 2022 +0800

    [ISSUE #5162] Fix bug about DefaultMessageStore maxFilterMessageCount calculating (#5171)
    
    * fix bug for comparing size with count
    
    * add getUnitSize function
---
 store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java     | 5 +++++
 .../main/java/org/apache/rocketmq/store/DefaultMessageStore.java    | 4 ++--
 .../java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java     | 5 +++++
 .../java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java | 6 ++++++
 4 files changed, 18 insertions(+), 2 deletions(-)

diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
index a40cbcd14..59231dcf4 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
@@ -184,6 +184,11 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle {
         return totalSize;
     }
 
+    @Override
+    public int getUnitSize() {
+        return CQ_STORE_UNIT_SIZE;
+    }
+
     @Override
     public long getOffsetInQueueByTime(final long timestamp) {
         MappedFile mappedFile = this.mappedFileQueue.getMappedFileByTime(timestamp);
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 72e6abb25..628444331 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -729,7 +729,7 @@ public class DefaultMessageStore implements MessageStore {
                 status = GetMessageStatus.OFFSET_OVERFLOW_BADLY;
                 nextBeginOffset = nextOffsetCorrection(offset, maxOffset);
             } else {
-                final int maxFilterMessageCount = Math.max(16000, maxMsgNums * ConsumeQueue.CQ_STORE_UNIT_SIZE);
+                final int maxFilterMessageSize = Math.max(16000, maxMsgNums * consumeQueue.getUnitSize());
                 final boolean diskFallRecorded = this.messageStoreConfig.isDiskFallRecorded();
 
                 long maxPullSize = Math.max(maxTotalMsgSize, 100);
@@ -764,7 +764,7 @@ public class DefaultMessageStore implements MessageStore {
 
                             boolean isInDisk = checkInDiskByCommitOffset(offsetPy, maxOffsetPy);
 
-                            if (cqUnit.getQueueOffset() - offset > maxFilterMessageCount) {
+                            if ((cqUnit.getQueueOffset() - offset) * consumeQueue.getUnitSize() > maxFilterMessageSize) {
                                 break;
                             }
 
diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
index 4cd0088b1..99bfa552c 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
@@ -926,6 +926,11 @@ public class BatchConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCy
         return this.mappedFileQueue.getTotalFileSize();
     }
 
+    @Override
+    public int getUnitSize() {
+        return CQ_STORE_UNIT_SIZE;
+    }
+
     @Override
     public void destroy() {
         this.maxMsgPhyOffsetInCommitLog = -1;
diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java
index c45592542..f36dda094 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java
@@ -120,6 +120,12 @@ public interface ConsumeQueueInterface {
      */
     long getTotalSize();
 
+    /**
+     * Get the unit size of this CQ which is different in different CQ impl
+     * @return cq unit size
+     */
+    int getUnitSize();
+
     /**
      * Correct min offset by min commit log offset.
      * @param minCommitLogOffset min commit log offset