You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/12/30 03:22:11 UTC

[pulsar] 17/18: Return message ID from compacted ledger while the compaction cursor reach the end of the topic (#13533)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 50626776c6d69b6d9951e67c2034ae60b48bd408
Author: lipenghui <pe...@apache.org>
AuthorDate: Wed Dec 29 13:26:57 2021 +0800

    Return message ID from compacted ledger while the compaction cursor reach the end of the topic (#13533)
    
    ### Motivation
    
    The problem happens when the compaction cursor reaches the end of the topic but the tail messages
    of the topic have been removed by producer writes null value messages during the topic compaction.
    
    For example:
    
    - 5 messages in the original topic with key: 0,1,2,3,4
    - the corresponding message IDs are: 1:0, 1:1, 1:2, 1:3, 1:4
    - producer send null value messages for key 3 and 4
    - trigger the topic compaction task
    
    After the compaction task complete,
    
    - 5 messages in the original topic: 1:0, 1:1, 1:2, 1:3, 1:4
    - 3 messages in the compacted ledger: 1:0, 1:1, 1:2
    
    At this moment, if the reader tries to get the last message ID of the topic,
    we should return `1:2` not `1:4`, because the reader is not able to read the message
    with keys `3` and `4` from the compacted topic, otherwise, the `reader.readNext()` method
    will be blocked until a new message written to the topic.
    
    ### Modifications
    
    The fix is straightforward, when the broker receives a get last message ID request,
    the broker will check if the compaction cursor reaches the end of the original topic.
    If yes, respond last message ID from the compacted ledger.
    
    ### Verifying this change
    
    New test added `testHasMessageAvailableWithNullValueMessage` which ensure the `hasMessageAvailable()`
    return false no more messages from the compacted topic if the compaction cursor reaches the end of the topic.
    
    (cherry picked from commit d49415e5a558ea9a82c93e55e77b2c3542eacb10)
    (cherry picked from commit c916233918afc9c441d64b563d0aa4cc6ffd4810)
---
 .../apache/pulsar/broker/service/ServerCnx.java    |  5 ++-
 .../apache/pulsar/compaction/CompactedTopic.java   |  2 +
 .../pulsar/compaction/CompactedTopicImpl.java      |  6 +--
 .../pulsar/compaction/CompactedTopicTest.java      | 48 +++++++++++++++++++++-
 4 files changed, 55 insertions(+), 6 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index dbab10d..c4ab0e2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1716,7 +1716,10 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
         ManagedLedgerImpl ml = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
 
         // If it's not pointing to a valid entry, respond messageId of the current position.
-        if (lastPosition.getEntryId() == -1) {
+        // If the compaction cursor reach the end of the topic, respond messageId from compacted ledger
+        Optional<Position> compactionHorizon = persistentTopic.getCompactedTopic().getCompactionHorizon();
+        if (lastPosition.getEntryId() == -1 || (compactionHorizon.isPresent()
+                        && lastPosition.compareTo((PositionImpl) compactionHorizon.get()) <= 0)) {
             handleLastMessageIdFromCompactedLedger(persistentTopic, requestId, partitionIndex,
                     markDeletePosition);
             return;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
index 31955a5..9e50fc0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.compaction;
 
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
 import org.apache.bookkeeper.mledger.Entry;
@@ -34,4 +35,5 @@ public interface CompactedTopic {
                                 ReadEntriesCallback callback,
                                 Consumer consumer);
     CompletableFuture<Entry> readLastEntryOfCompactedLedger();
+    Optional<Position> getCompactionHorizon();
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index a6d6fc9..aac213f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -20,7 +20,6 @@ package org.apache.pulsar.compaction;
 
 import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
 import com.github.benmanes.caffeine.cache.Caffeine;
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ComparisonChain;
 import io.netty.buffer.ByteBuf;
 import java.util.ArrayList;
@@ -311,9 +310,8 @@ public class CompactedTopicImpl implements CompactedTopic {
             .compare(p.getEntryId(), m.getEntryId()).result();
     }
 
-    @VisibleForTesting
-    PositionImpl getCompactionHorizon() {
-        return this.compactionHorizon;
+    public synchronized Optional<Position> getCompactionHorizon() {
+        return Optional.ofNullable(this.compactionHorizon);
     }
     private static final Logger log = LoggerFactory.getLogger(CompactedTopicImpl.class);
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
index d44868a..4d00d28 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
@@ -254,7 +254,8 @@ public class CompactedTopicTest extends MockedPulsarServiceBaseTest {
         Assert.assertTrue(compactedTopic.getCompactedTopicContext().isPresent());
         Assert.assertEquals(compactedTopic.getCompactedTopicContext().get().getLedger().getId(),
                 newCompactedLedger.getId());
-        Assert.assertEquals(compactedTopic.getCompactionHorizon(), newHorizon);
+        Assert.assertTrue(compactedTopic.getCompactionHorizon().isPresent());
+        Assert.assertEquals(compactedTopic.getCompactionHorizon().get(), newHorizon);
         compactedTopic.deleteCompactedLedger(oldCompactedLedger.getId()).join();
 
         // old ledger should be deleted, new still there
@@ -688,4 +689,49 @@ public class CompactedTopicTest extends MockedPulsarServiceBaseTest {
         Assert.assertFalse(reader.hasMessageAvailable());
     }
 
+    @Test
+    public void testHasMessageAvailableWithNullValueMessage() throws Exception {
+        String topic = "persistent://my-property/use/my-ns/testHasMessageAvailable-" +
+                UUID.randomUUID();
+        final int numMessages = 10;
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .blockIfQueueFull(true)
+                .enableBatching(false)
+                .create();
+        CompletableFuture<MessageId> lastMessage = null;
+        for (int i = 0; i < numMessages; ++i) {
+            lastMessage = producer.newMessage().key(i + "").value(String.format("msg [%d]", i)).sendAsync();
+        }
+
+        for (int i = numMessages / 2; i < numMessages; ++i) {
+            lastMessage = producer.newMessage().key(i + "").value(null).sendAsync();
+        }
+        producer.flush();
+        lastMessage.join();
+        admin.topics().triggerCompaction(topic);
+        Awaitility.await().untilAsserted(() -> {
+            PersistentTopicInternalStats stats = admin.topics().getInternalStats(topic);
+            Assert.assertNotEquals(stats.compactedLedger.ledgerId, -1);
+            Assert.assertEquals(stats.compactedLedger.entries, numMessages / 2);
+            Assert.assertEquals(admin.topics().getStats(topic)
+                    .getSubscriptions().get(COMPACTION_SUBSCRIPTION).getConsumers().size(), 0);
+            Assert.assertEquals(stats.lastConfirmedEntry, stats.cursors.get(COMPACTION_SUBSCRIPTION).markDeletePosition);
+        });
+
+        @Cleanup
+        Reader<byte[]> reader = pulsarClient.newReader()
+                .topic(topic)
+                .startMessageIdInclusive()
+                .startMessageId(MessageId.earliest)
+                .readCompacted(true)
+                .create();
+        for (int i = numMessages / 2; i < numMessages; ++i) {
+            reader.readNext();
+        }
+        Assert.assertFalse(reader.hasMessageAvailable());
+        Assert.assertNull(reader.readNext(3, TimeUnit.SECONDS));
+    }
+
 }