You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/07/06 02:14:54 UTC

[pulsar] branch branch-2.7 updated: Do not move the non-durable cursor position when trimming ledgers while topic with compaction. (#16403)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new 83cb1f8f97e Do not move the non-durable cursor position when trimming ledgers while topic with compaction. (#16403)
83cb1f8f97e is described below

commit 83cb1f8f97e0f8d88525bcaf6d212a9d63691cdd
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Wed Jul 6 10:14:48 2022 +0800

    Do not move the non-durable cursor position when trimming ledgers while topic with compaction. (#16403)
---
 .../apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java    |  3 ++-
 .../apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java |  6 +++++-
 .../service/AbstractDispatcherSingleActiveConsumer.java      | 12 +++++++++---
 .../NonPersistentDispatcherSingleActiveConsumer.java         |  2 +-
 .../persistent/PersistentDispatcherSingleActiveConsumer.java |  2 +-
 5 files changed, 18 insertions(+), 7 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 7bdc5c9ef11..f8d7098d042 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -2292,7 +2292,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
             // move the mark delete position to the highestPositionToDelete only if it is smaller than the add confirmed
             // to prevent the edge case where the cursor is caught up to the latest and highestPositionToDelete may be larger than the last add confirmed
             if (highestPositionToDelete.compareTo((PositionImpl) cursor.getMarkDeletedPosition()) > 0
-                    && highestPositionToDelete.compareTo((PositionImpl) cursor.getManagedLedger().getLastConfirmedEntry()) <= 0 ) {
+                    && highestPositionToDelete.compareTo((PositionImpl) cursor.getManagedLedger().getLastConfirmedEntry()) <= 0
+                    && !(!cursor.isDurable() && cursor instanceof NonDurableCursorImpl && ((NonDurableCursorImpl) cursor).isReadCompacted())) {
                 cursor.asyncMarkDelete(highestPositionToDelete, new MarkDeleteCallback() {
                     @Override
                     public void markDeleteComplete(Object ctx) {
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
index 28ed13d808c..263e6faa605 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
@@ -33,7 +33,7 @@ import org.slf4j.LoggerFactory;
 
 public class NonDurableCursorImpl extends ManagedCursorImpl {
 
-    private final boolean readCompacted;
+    private volatile boolean readCompacted;
 
     NonDurableCursorImpl(BookKeeper bookkeeper, ManagedLedgerConfig config, ManagedLedgerImpl ledger, String cursorName,
                          PositionImpl startCursorPosition, PulsarApi.CommandSubscribe.InitialPosition initialPosition,
@@ -120,6 +120,10 @@ public class NonDurableCursorImpl extends ManagedCursorImpl {
         callback.deleteCursorComplete(ctx);
     }
 
+    public void setReadCompacted(boolean readCompacted) {
+        this.readCompacted = readCompacted;
+    }
+
     public boolean isReadCompacted() {
         return readCompacted;
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
index 5d486528f7f..0a432aa065b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
@@ -27,6 +27,9 @@ import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.impl.NonDurableCursorImpl;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
 import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
@@ -47,7 +50,7 @@ public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBas
     protected boolean isKeyHashRangeFiltered = false;
     protected CompletableFuture<Void> closeFuture = null;
     protected final int partitionIndex;
-
+    protected final ManagedCursor cursor;
     // This dispatcher supports both the Exclusive and Failover subscription types
     protected final SubType subscriptionType;
 
@@ -59,12 +62,13 @@ public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBas
     protected boolean isFirstRead = true;
 
     public AbstractDispatcherSingleActiveConsumer(SubType subscriptionType, int partitionIndex,
-            String topicName, Subscription subscription, ServiceConfiguration serviceConfig) {
+            String topicName, Subscription subscription, ServiceConfiguration serviceConfig, ManagedCursor cursor) {
         super(subscription, serviceConfig);
         this.topicName = topicName;
         this.consumers = new CopyOnWriteArrayList<>();
         this.partitionIndex = partitionIndex;
         this.subscriptionType = subscriptionType;
+        this.cursor = cursor;
         ACTIVE_CONSUMER_UPDATER.set(this, null);
     }
 
@@ -180,7 +184,9 @@ public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBas
                 consumer.notifyActiveConsumerChange(currentActiveConsumer);
             }
         }
-
+        if (cursor != null && !cursor.isDurable() && cursor instanceof NonDurableCursorImpl) {
+            ((NonDurableCursorImpl) cursor).setReadCompacted(ACTIVE_CONSUMER_UPDATER.get(this).readCompacted());
+        }
     }
 
     public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceException {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
index 69e9a95b3ae..a5a35155419 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
@@ -51,7 +51,7 @@ public final class NonPersistentDispatcherSingleActiveConsumer extends AbstractD
     public NonPersistentDispatcherSingleActiveConsumer(SubType subscriptionType, int partitionIndex,
             NonPersistentTopic topic, Subscription subscription) {
         super(subscriptionType, partitionIndex, topic.getName(), subscription,
-                topic.getBrokerService().pulsar().getConfiguration());
+                topic.getBrokerService().pulsar().getConfiguration(), null);
         this.topic = topic;
         this.subscription = subscription;
         this.msgDrop = new Rate();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 00b549d31c6..47a9cf22f14 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -83,7 +83,7 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp
     public PersistentDispatcherSingleActiveConsumer(ManagedCursor cursor, SubType subscriptionType, int partitionIndex,
             PersistentTopic topic, Subscription subscription) {
         super(subscriptionType, partitionIndex, topic.getName(), subscription,
-                topic.getBrokerService().pulsar().getConfiguration());
+                topic.getBrokerService().pulsar().getConfiguration(), cursor);
         this.topic = topic;
         this.name = topic.getName() + " / " + (cursor.getName() != null ? Codec.decode(cursor.getName())
                 : ""/* NonDurableCursor doesn't have name */);