You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by cs...@apache.org on 2022/05/10 10:30:18 UTC

[rocketmq] branch 5.0.0-beta updated: bugfix : Returning minOffset when timestamp is larger than queue-unit max tampstamp

This is an automated email from the ASF dual-hosted git repository.

cserwen pushed a commit to branch 5.0.0-beta
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-beta by this push:
     new 19996a024 bugfix : Returning minOffset when timestamp is larger than queue-unit max tampstamp
19996a024 is described below

commit 19996a0244663869580dc9b9864fd367bdaa7e38
Author: hankunming <ha...@xiaomi.com>
AuthorDate: Wed Mar 30 17:56:18 2022 +0800

    bugfix : Returning minOffset when timestamp is larger than queue-unit max tampstamp
---
 .../main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java  | 4 ++++
 .../java/org/apache/rocketmq/store/queue/BatchConsumeQueueTest.java   | 2 +-
 2 files changed, 5 insertions(+), 1 deletion(-)

diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
index b956914cd..3db3c0e7e 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
@@ -710,6 +710,10 @@ public class BatchConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCy
         try {
             ByteBuffer byteBuffer = sbr.getByteBuffer();
             int left = targetMinOffset.getIndexPos(), right = targetMaxOffset.getIndexPos();
+            long maxQueueTimestamp = byteBuffer.getLong(right + MSG_STORE_TIME_OFFSET_INDEX);
+            if (timestamp >= maxQueueTimestamp) {
+                return byteBuffer.getLong(right + MSG_BASE_OFFSET_INDEX);
+            }
             int mid = binarySearchRight(byteBuffer, left, right, CQ_STORE_UNIT_SIZE, MSG_STORE_TIME_OFFSET_INDEX, timestamp);
             if (mid != -1) {
                 return byteBuffer.getLong(mid + MSG_BASE_OFFSET_INDEX);
diff --git a/store/src/test/java/org/apache/rocketmq/store/queue/BatchConsumeQueueTest.java b/store/src/test/java/org/apache/rocketmq/store/queue/BatchConsumeQueueTest.java
index 1c8e31fce..8b3b8ddb3 100644
--- a/store/src/test/java/org/apache/rocketmq/store/queue/BatchConsumeQueueTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/queue/BatchConsumeQueueTest.java
@@ -161,7 +161,7 @@ public class BatchConsumeQueueTest extends StoreTestBase {
         }
         end = System.currentTimeMillis();
         Assert.assertTrue(end - start < 2000);
-        Assert.assertEquals(-1, batchConsumeQueue.getOffsetInQueueByTime(System.currentTimeMillis()));
+        Assert.assertEquals(199991, batchConsumeQueue.getOffsetInQueueByTime(System.currentTimeMillis()));
         batchConsumeQueue.destroy();
     }