You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rg...@apache.org on 2022/03/01 16:24:26 UTC

[pulsar] 17/21: Fix can't read the latest message of the compacted topic (#14449)

This is an automated email from the ASF dual-hosted git repository.

rgao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 2bc8f487a71ea812b06d119f24fd7051c82eb6e8
Author: lipenghui <pe...@apache.org>
AuthorDate: Fri Feb 25 21:38:47 2022 +0800

    Fix can't read the latest message of the compacted topic (#14449)
    
    If the reader enabled read compacted and all the data of topic has been compacted
    to the compacted ledger, the original topic does not have any data. In this case,
    the reader is not able to read the latest message of the compacted topic.
    
    ```java
    Reader<byte[]> reader = pulsarClient.newReader()
            .topic(topic)
            .startMessageId(MessageId.latest)
            .startMessageIdInclusive()
            .readCompacted(true)
            .create();
    ```
    
    The root cause is if the `startMessageIdInclusive` is true
    and the `startMessageId` is `latest`, the reader will get the
    last message ID from the broker and then seek to the last message.
    But, the seek method did not consider if there are messages in the
    compacted ledger, so not able to seek to last message of the compacted
    ledger.
    
    Add force reset option for the managed cursor, if the seek
    position < compaction horizon, we should force reset the cursor
    to the given position, so that the reader able to start read from
    the compacted ledger.
    
    (cherry picked from commit ddc51924e90550ef50fd3cdd099b6aec56aec260)
---
 .../apache/bookkeeper/mledger/ManagedCursor.java   |  6 ++-
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 11 ++---
 .../mledger/impl/ManagedCursorContainerTest.java   |  3 +-
 .../bookkeeper/mledger/impl/ManagedCursorTest.java |  6 +--
 .../mledger/impl/NonDurableCursorTest.java         |  2 +-
 .../service/persistent/PersistentSubscription.java | 10 ++++-
 .../pulsar/broker/service/PersistentTopicTest.java |  3 +-
 .../pulsar/compaction/CompactedTopicTest.java      | 50 ++++++++++++++++++++++
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  6 ++-
 9 files changed, 82 insertions(+), 15 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
index d1fb90a..f67cd96 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
@@ -533,10 +533,14 @@ public interface ManagedCursor {
      *
      * @param position
      *            position to move the cursor to
+     * @param forceReset
+     *            whether to force reset the position even if the position is no longer in the managed ledger,
+     *            this is used by compacted topic which has data in the compacted ledger, to ensure the cursor can
+     *            read data from the compacted ledger.
      * @param callback
      *            callback object
      */
-    void asyncResetCursor(final Position position, AsyncCallbacks.ResetCursorCallback callback);
+    void asyncResetCursor(Position position, boolean forceReset, AsyncCallbacks.ResetCursorCallback callback);
 
     /**
      * Read the specified set of positions from ManagedLedger.
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 8c345026..8c96f0e 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1135,7 +1135,7 @@ public class ManagedCursorImpl implements ManagedCursor {
     }
 
     @Override
-    public void asyncResetCursor(Position newPos, AsyncCallbacks.ResetCursorCallback callback) {
+    public void asyncResetCursor(Position newPos, boolean forceReset, AsyncCallbacks.ResetCursorCallback callback) {
         checkArgument(newPos instanceof PositionImpl);
         final PositionImpl newPosition = (PositionImpl) newPos;
 
@@ -1143,9 +1143,10 @@ public class ManagedCursorImpl implements ManagedCursor {
         ledger.getExecutor().executeOrdered(ledger.getName(), safeRun(() -> {
             PositionImpl actualPosition = newPosition;
 
-            if (!ledger.isValidPosition(actualPosition) &&
-                !actualPosition.equals(PositionImpl.earliest) &&
-                !actualPosition.equals(PositionImpl.latest)) {
+            if (!ledger.isValidPosition(actualPosition)
+                && !actualPosition.equals(PositionImpl.earliest)
+                && !actualPosition.equals(PositionImpl.latest)
+                && !forceReset) {
                 actualPosition = ledger.getNextValidPosition(actualPosition);
 
                 if (actualPosition == null) {
@@ -1168,7 +1169,7 @@ public class ManagedCursorImpl implements ManagedCursor {
         final Result result = new Result();
         final CountDownLatch counter = new CountDownLatch(1);
 
-        asyncResetCursor(newPos, new AsyncCallbacks.ResetCursorCallback() {
+        asyncResetCursor(newPos, false, new AsyncCallbacks.ResetCursorCallback() {
             @Override
             public void resetComplete(Object ctx) {
                 counter.countDown();
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
index af30c9c..56de803 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
@@ -237,7 +237,8 @@ public class ManagedCursorContainerTest {
         }
 
         @Override
-        public void asyncResetCursor(final Position position, AsyncCallbacks.ResetCursorCallback callback) {
+        public void asyncResetCursor(final Position position, boolean forceReset,
+                AsyncCallbacks.ResetCursorCallback callback) {
 
         }
 
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 676e92f..892d6b3 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -687,7 +687,7 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
         CountDownLatch countDownLatch = new CountDownLatch(1);
         PositionImpl resetPosition = new PositionImpl(lastPosition.getLedgerId(), lastPosition.getEntryId() - 2);
 
-        cursor.asyncResetCursor(resetPosition, new AsyncCallbacks.ResetCursorCallback() {
+        cursor.asyncResetCursor(resetPosition, false, new AsyncCallbacks.ResetCursorCallback() {
             @Override
             public void resetComplete(Object ctx) {
                 moveStatus.set(true);
@@ -738,7 +738,7 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
                     final PositionImpl resetPosition = new PositionImpl(lastPosition.getLedgerId(),
                             lastPosition.getEntryId() - (5 * idx));
 
-                    cursor.asyncResetCursor(resetPosition, new AsyncCallbacks.ResetCursorCallback() {
+                    cursor.asyncResetCursor(resetPosition, false, new AsyncCallbacks.ResetCursorCallback() {
                         @Override
                         public void resetComplete(Object ctx) {
                             moveStatus.set(true);
@@ -787,7 +787,7 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
 
         long lastActive = cursor.getLastActive();
 
-        cursor.asyncResetCursor(lastPosition, new AsyncCallbacks.ResetCursorCallback() {
+        cursor.asyncResetCursor(lastPosition, false, new AsyncCallbacks.ResetCursorCallback() {
             @Override
             public void resetComplete(Object ctx) {
                 moveStatus.set(true);
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
index 4c2944f..5d0271d 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
@@ -371,7 +371,7 @@ public class NonDurableCursorTest extends MockedBookKeeperTestCase {
         CountDownLatch countDownLatch = new CountDownLatch(1);
         PositionImpl resetPosition = new PositionImpl(lastPosition.getLedgerId(), lastPosition.getEntryId() - 2);
 
-        cursor.asyncResetCursor(resetPosition, new AsyncCallbacks.ResetCursorCallback() {
+        cursor.asyncResetCursor(resetPosition, false, new AsyncCallbacks.ResetCursorCallback() {
             @Override
             public void resetComplete(Object ctx) {
                 moveStatus.set(true);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 8637ecd..9e57580 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -659,7 +659,15 @@ public class PersistentSubscription implements Subscription {
                     topicName, subName);
 
             try {
-                cursor.asyncResetCursor(finalPosition, new AsyncCallbacks.ResetCursorCallback() {
+                boolean forceReset = false;
+                if (topic.getCompactedTopic() != null && topic.getCompactedTopic().getCompactionHorizon().isPresent()) {
+                    PositionImpl horizon = (PositionImpl) topic.getCompactedTopic().getCompactionHorizon().get();
+                    PositionImpl resetTo = (PositionImpl) finalPosition;
+                    if (horizon.compareTo(resetTo) >= 0) {
+                        forceReset = true;
+                    }
+                }
+                cursor.asyncResetCursor(finalPosition, forceReset, new AsyncCallbacks.ResetCursorCallback() {
                     @Override
                     public void resetComplete(Object ctx) {
                         if (log.isDebugEnabled()) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index d94320f..2314acd 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service;
 import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockBookKeeper;
 import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockZooKeeper;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Mockito.CALLS_REAL_METHODS;
 import static org.mockito.Mockito.anyString;
@@ -2177,7 +2178,7 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase {
         doAnswer((Answer<Object>) invocationOnMock -> {
             ((AsyncCallbacks.ResetCursorCallback) invocationOnMock.getArguments()[1]).resetComplete(null);
             return null;
-        }).when(mockCursor).asyncResetCursor(any(), any());
+        }).when(mockCursor).asyncResetCursor(any(), anyBoolean(), any());
         doAnswer((Answer<Object>) invocationOnMock -> {
             ((DeleteCursorCallback) invocationOnMock.getArguments()[1]).deleteCursorComplete(null);
             return null;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
index accce4ef..4ae699b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
@@ -734,6 +734,7 @@ public class CompactedTopicTest extends MockedPulsarServiceBaseTest {
         Assert.assertNull(reader.readNext(3, TimeUnit.SECONDS));
     }
 
+    @Test
     public void testReadCompleteMessagesDuringTopicUnloading() throws Exception {
         String topic = "persistent://my-property/use/my-ns/testReadCompleteMessagesDuringTopicUnloading-" +
                 UUID.randomUUID();
@@ -789,4 +790,53 @@ public class CompactedTopicTest extends MockedPulsarServiceBaseTest {
             Assert.assertEquals(reader.readNext().getValue(), String.format("msg [%d]", i + numMessages));
         }
     }
+
+    @Test
+    public void testReadCompactedLatestMessageWithInclusive() throws Exception {
+        String topic = "persistent://my-property/use/my-ns/testLedgerRollover-" +
+                UUID.randomUUID();
+        final int numMessages = 1;
+
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .blockIfQueueFull(true)
+                .enableBatching(false)
+                .create();
+
+        CompletableFuture<MessageId> lastMessage = null;
+        for (int i = 0; i < numMessages; ++i) {
+            lastMessage = producer.newMessage().key(i + "").value(String.format("msg [%d]", i)).sendAsync();
+        }
+        producer.flush();
+        lastMessage.join();
+        admin.topics().unload(topic);
+        admin.topics().triggerCompaction(topic);
+        Awaitility.await().untilAsserted(() -> {
+            PersistentTopicInternalStats stats = admin.topics().getInternalStats(topic);
+            Assert.assertNotEquals(stats.compactedLedger.ledgerId, -1);
+            Assert.assertEquals(stats.compactedLedger.entries, numMessages);
+            Assert.assertEquals(admin.topics().getStats(topic)
+                    .getSubscriptions().get(COMPACTION_SUBSCRIPTION).getConsumers().size(), 0);
+            Assert.assertEquals(stats.lastConfirmedEntry, stats.cursors.get(COMPACTION_SUBSCRIPTION).markDeletePosition);
+        });
+
+        Awaitility.await()
+                .pollInterval(3, TimeUnit.SECONDS)
+                .atMost(30, TimeUnit.SECONDS).untilAsserted(() -> {
+                    admin.topics().unload(topic);
+                    Assert.assertTrue(admin.topics().getInternalStats(topic).lastConfirmedEntry.endsWith("-1"));
+                });
+
+        @Cleanup
+        Reader<byte[]> reader = pulsarClient.newReader()
+                .topic(topic)
+                .startMessageIdInclusive()
+                .startMessageId(MessageId.latest)
+                .readCompacted(true)
+                .create();
+
+        Assert.assertTrue(reader.hasMessageAvailable());
+        Assert.assertEquals(reader.readNext().getMessageId(), lastMessage.get());
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index e5a5745..856a775 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -2012,8 +2012,10 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
                     MessageIdImpl markDeletePosition = MessageIdImpl
                             .convertToMessageIdImpl(response.markDeletePosition);
 
-                    if (markDeletePosition != null) {
-                        // we only care about comparing ledger ids and entry ids as mark delete position doesn't have other ids such as batch index
+                    if (markDeletePosition != null && !(markDeletePosition.getEntryId() < 0
+                            && markDeletePosition.getLedgerId() > lastMessageId.getLedgerId())) {
+                        // we only care about comparing ledger ids and entry ids as mark delete position doesn't have
+                        // other ids such as batch index
                         int result = ComparisonChain.start()
                                 .compare(markDeletePosition.getLedgerId(), lastMessageId.getLedgerId())
                                 .compare(markDeletePosition.getEntryId(), lastMessageId.getEntryId())