You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/11/05 12:21:34 UTC

[pulsar] 03/04: [Compaction] Do not move the non-durable cursor position when trimming ledgers while topic with compaction (#12602)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit ea40033a5e456b147496269892c3cd77b0236aed
Author: lipenghui <pe...@apache.org>
AuthorDate: Fri Nov 5 00:55:59 2021 +0800

    [Compaction] Do not move the non-durable cursor position when trimming ledgers while topic with compaction (#12602)
    
    * [Compaction] Do not move the non-durable cursor position when trimming ledgers while topic with compaction.
    
    For the non-durable cursor, the ledgers trimming task will cause skip the removed ledgers
    to avoid readers introduced backlogs and make sure the data can be removed if over the retention,
    more details to see #6787.
    
    But for a topic which enabled compaction, this will lead to the reader skips the compacted data.
    The new added test can illustrate this problem well. For reading compacted data, reading a message ID
    that earlier that the first message ID of the original data is a normal behavior, so we should not
    move forward the cursor which will read the compacted data.
    
    * Fix checkstyle.
    
    * Fix tests.
    
    * Fix tests.
    
    (cherry picked from commit a6b1b34a5c028b74bd44c5b8f32b42752b6cec14)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  3 +-
 .../mledger/impl/NonDurableCursorImpl.java         | 10 +++
 .../AbstractDispatcherSingleActiveConsumer.java    | 10 ++-
 ...onPersistentDispatcherSingleActiveConsumer.java |  2 +-
 .../PersistentDispatcherSingleActiveConsumer.java  |  4 +-
 .../pulsar/compaction/CompactedTopicTest.java      | 73 ++++++++++++++++++++++
 6 files changed, 95 insertions(+), 7 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 126acdb..fa62ebe 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -2543,7 +2543,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
             // move the mark delete position to the highestPositionToDelete only if it is smaller than the add confirmed
             // to prevent the edge case where the cursor is caught up to the latest and highestPositionToDelete may be larger than the last add confirmed
             if (highestPositionToDelete.compareTo((PositionImpl) cursor.getMarkDeletedPosition()) > 0
-                    && highestPositionToDelete.compareTo((PositionImpl) cursor.getManagedLedger().getLastConfirmedEntry()) <= 0 ) {
+                    && highestPositionToDelete.compareTo((PositionImpl) cursor.getManagedLedger().getLastConfirmedEntry()) <= 0
+                    && !(!cursor.isDurable() && cursor instanceof NonDurableCursorImpl && ((NonDurableCursorImpl) cursor).isReadCompacted())) {
                 cursor.asyncMarkDelete(highestPositionToDelete, new MarkDeleteCallback() {
                     @Override
                     public void markDeleteComplete(Object ctx) {
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
index 15b1f04..0f7ffe41 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
@@ -33,6 +33,8 @@ import org.slf4j.LoggerFactory;
 
 public class NonDurableCursorImpl extends ManagedCursorImpl {
 
+    private volatile boolean readCompacted;
+
     NonDurableCursorImpl(BookKeeper bookkeeper, ManagedLedgerConfig config, ManagedLedgerImpl ledger, String cursorName,
                          PositionImpl startCursorPosition, CommandSubscribe.InitialPosition initialPosition) {
         super(bookkeeper, config, ledger, cursorName);
@@ -116,6 +118,14 @@ public class NonDurableCursorImpl extends ManagedCursorImpl {
         callback.deleteCursorComplete(ctx);
     }
 
+    public void setReadCompacted(boolean readCompacted) {
+        this.readCompacted = readCompacted;
+    }
+
+    public boolean isReadCompacted() {
+        return readCompacted;
+    }
+
     @Override
     public synchronized String toString() {
         return MoreObjects.toStringHelper(this).add("ledger", ledger.getName()).add("ackPos", markDeletePosition)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
index 690a598..4c7ea45 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
@@ -26,6 +26,8 @@ import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.impl.NonDurableCursorImpl;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
 import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
@@ -45,7 +47,7 @@ public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBas
     protected boolean isKeyHashRangeFiltered = false;
     protected CompletableFuture<Void> closeFuture = null;
     protected final int partitionIndex;
-
+    protected final ManagedCursor cursor;
     // This dispatcher supports both the Exclusive and Failover subscription types
     protected final SubType subscriptionType;
 
@@ -59,12 +61,13 @@ public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBas
 
     public AbstractDispatcherSingleActiveConsumer(SubType subscriptionType, int partitionIndex,
                                                   String topicName, Subscription subscription,
-                                                  ServiceConfiguration serviceConfig) {
+                                                  ServiceConfiguration serviceConfig, ManagedCursor cursor) {
         super(subscription, serviceConfig);
         this.topicName = topicName;
         this.consumers = new CopyOnWriteArrayList<>();
         this.partitionIndex = partitionIndex;
         this.subscriptionType = subscriptionType;
+        this.cursor = cursor;
         ACTIVE_CONSUMER_UPDATER.set(this, null);
     }
 
@@ -178,6 +181,9 @@ public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBas
                 consumer.notifyActiveConsumerChange(currentActiveConsumer);
             }
         }
+        if (cursor != null && !cursor.isDurable() && cursor instanceof NonDurableCursorImpl) {
+            ((NonDurableCursorImpl) cursor).setReadCompacted(ACTIVE_CONSUMER_UPDATER.get(this).readCompacted());
+        }
 
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
index 6094ab7..5cdbff1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
@@ -44,7 +44,7 @@ public final class NonPersistentDispatcherSingleActiveConsumer extends AbstractD
     public NonPersistentDispatcherSingleActiveConsumer(SubType subscriptionType, int partitionIndex,
                                                        NonPersistentTopic topic, Subscription subscription) {
         super(subscriptionType, partitionIndex, topic.getName(), subscription,
-                topic.getBrokerService().pulsar().getConfiguration());
+                topic.getBrokerService().pulsar().getConfiguration(), null);
         this.topic = topic;
         this.subscription = subscription;
         this.msgDrop = new Rate();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 04b8f5a..1f6cbec 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -58,7 +58,6 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher
         implements Dispatcher, ReadEntriesCallback {
 
     protected final PersistentTopic topic;
-    protected final ManagedCursor cursor;
     protected final String name;
     private Optional<DispatchRateLimiter> dispatchRateLimiter = Optional.empty();
 
@@ -73,11 +72,10 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher
     public PersistentDispatcherSingleActiveConsumer(ManagedCursor cursor, SubType subscriptionType, int partitionIndex,
                                                     PersistentTopic topic, Subscription subscription) {
         super(subscriptionType, partitionIndex, topic.getName(), subscription,
-                topic.getBrokerService().pulsar().getConfiguration());
+                topic.getBrokerService().pulsar().getConfiguration(), cursor);
         this.topic = topic;
         this.name = topic.getName() + " / " + (cursor.getName() != null ? Codec.decode(cursor.getName())
                 : ""/* NonDurableCursor doesn't have name */);
-        this.cursor = cursor;
         this.readBatchSize = serviceConfig.getDispatcherMaxReadBatchSize();
         this.readFailureBackoff = new Backoff(serviceConfig.getDispatcherReadFailureBackoffInitialTimeInMs(),
             TimeUnit.MILLISECONDS, serviceConfig.getDispatcherReadFailureBackoffMaxTimeInMs(),
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
index 608d99b..cbe7372 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
@@ -509,4 +509,77 @@ public class CompactedTopicTest extends MockedPulsarServiceBaseTest {
         reader.close();
         producer.close();
     }
+
+    @Test
+    public void testReadCompactedDataWhenLedgerRolloverKickIn() throws Exception {
+        String topic = "persistent://my-property/use/my-ns/testReadCompactedDataWhenLedgerRolloverKickIn-" +
+                UUID.randomUUID();
+        final int numMessages = 2000;
+        final int keys = 200;
+        final String msg = "Test";
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .blockIfQueueFull(true)
+                .maxPendingMessages(numMessages)
+                .enableBatching(false)
+                .create();
+        CompletableFuture<MessageId> lastMessage = null;
+        for (int i = 0; i < numMessages; ++i) {
+            lastMessage = producer.newMessage().key(i % keys + "").value(msg).sendAsync();
+        }
+        producer.flush();
+        lastMessage.join();
+        admin.topics().triggerCompaction(topic);
+        Awaitility.await().untilAsserted(() -> {
+            PersistentTopicInternalStats stats = admin.topics().getInternalStats(topic);
+            Assert.assertNotEquals(stats.compactedLedger.ledgerId, -1);
+            Assert.assertEquals(stats.compactedLedger.entries, keys);
+            Assert.assertEquals(admin.topics().getStats(topic)
+                    .getSubscriptions().get(COMPACTION_SUBSCRIPTION).getConsumers().size(), 0);
+        });
+        // Send more 200 keys
+        for (int i = 0; i < numMessages; ++i) {
+            lastMessage = producer.newMessage().key((i % keys + keys) + "").value(msg).sendAsync();
+        }
+        producer.flush();
+        lastMessage.join();
+
+        // Make sure we have more than 1 original ledgers
+        admin.topics().unload(topic);
+        Awaitility.await().untilAsserted(() -> {
+            Assert.assertEquals(admin.topics().getInternalStats(topic).ledgers.size(), 2);
+        });
+
+        // Start a new reader to reading messages
+        Reader<String> reader = pulsarClient.newReader(Schema.STRING)
+                .topic(topic)
+                .startMessageId(MessageId.earliest)
+                .readCompacted(true)
+                .receiverQueueSize(10)
+                .create();
+
+        // Send more 200 keys
+        for (int i = 0; i < numMessages; ++i) {
+            lastMessage = producer.newMessage().key((i % keys + keys * 2) + "").value(msg).sendAsync();
+        }
+        producer.flush();
+        lastMessage.join();
+
+        admin.topics().triggerCompaction(topic);
+        Awaitility.await().untilAsserted(() -> {
+            PersistentTopicInternalStats stats = admin.topics().getInternalStats(topic);
+            Assert.assertNotEquals(stats.compactedLedger.ledgerId, -1);
+            Assert.assertEquals(stats.compactedLedger.entries, keys * 3);
+            Assert.assertEquals(admin.topics().getStats(topic)
+                    .getSubscriptions().get(COMPACTION_SUBSCRIPTION).getConsumers().size(), 0);
+        });
+
+        // The reader should read all 600 keys
+        int received = 0;
+        while (reader.hasMessageAvailable()) {
+            System.out.println(reader.readNext().getKey());
+            received++;
+        }
+        Assert.assertEquals(received, keys * 3);
+    }
 }