You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/07/06 02:14:54 UTC
[pulsar] branch branch-2.7 updated: Do not move the non-durable cursor position when trimming ledgers while topic with compaction. (#16403)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new 83cb1f8f97e Do not move the non-durable cursor position when trimming ledgers while topic with compaction. (#16403)
83cb1f8f97e is described below
commit 83cb1f8f97e0f8d88525bcaf6d212a9d63691cdd
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Wed Jul 6 10:14:48 2022 +0800
Do not move the non-durable cursor position when trimming ledgers while topic with compaction. (#16403)
---
.../apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java | 3 ++-
.../apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java | 6 +++++-
.../service/AbstractDispatcherSingleActiveConsumer.java | 12 +++++++++---
.../NonPersistentDispatcherSingleActiveConsumer.java | 2 +-
.../persistent/PersistentDispatcherSingleActiveConsumer.java | 2 +-
5 files changed, 18 insertions(+), 7 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 7bdc5c9ef11..f8d7098d042 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -2292,7 +2292,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
// move the mark delete position to the highestPositionToDelete only if it is smaller than the add confirmed
// to prevent the edge case where the cursor is caught up to the latest and highestPositionToDelete may be larger than the last add confirmed
if (highestPositionToDelete.compareTo((PositionImpl) cursor.getMarkDeletedPosition()) > 0
- && highestPositionToDelete.compareTo((PositionImpl) cursor.getManagedLedger().getLastConfirmedEntry()) <= 0 ) {
+ && highestPositionToDelete.compareTo((PositionImpl) cursor.getManagedLedger().getLastConfirmedEntry()) <= 0
+ && !(!cursor.isDurable() && cursor instanceof NonDurableCursorImpl && ((NonDurableCursorImpl) cursor).isReadCompacted())) {
cursor.asyncMarkDelete(highestPositionToDelete, new MarkDeleteCallback() {
@Override
public void markDeleteComplete(Object ctx) {
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
index 28ed13d808c..263e6faa605 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
@@ -33,7 +33,7 @@ import org.slf4j.LoggerFactory;
public class NonDurableCursorImpl extends ManagedCursorImpl {
- private final boolean readCompacted;
+ private volatile boolean readCompacted;
NonDurableCursorImpl(BookKeeper bookkeeper, ManagedLedgerConfig config, ManagedLedgerImpl ledger, String cursorName,
PositionImpl startCursorPosition, PulsarApi.CommandSubscribe.InitialPosition initialPosition,
@@ -120,6 +120,10 @@ public class NonDurableCursorImpl extends ManagedCursorImpl {
callback.deleteCursorComplete(ctx);
}
+ public void setReadCompacted(boolean readCompacted) {
+ this.readCompacted = readCompacted;
+ }
+
public boolean isReadCompacted() {
return readCompacted;
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
index 5d486528f7f..0a432aa065b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
@@ -27,6 +27,9 @@ import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.impl.NonDurableCursorImpl;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
@@ -47,7 +50,7 @@ public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBas
protected boolean isKeyHashRangeFiltered = false;
protected CompletableFuture<Void> closeFuture = null;
protected final int partitionIndex;
-
+ protected final ManagedCursor cursor;
// This dispatcher supports both the Exclusive and Failover subscription types
protected final SubType subscriptionType;
@@ -59,12 +62,13 @@ public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBas
protected boolean isFirstRead = true;
public AbstractDispatcherSingleActiveConsumer(SubType subscriptionType, int partitionIndex,
- String topicName, Subscription subscription, ServiceConfiguration serviceConfig) {
+ String topicName, Subscription subscription, ServiceConfiguration serviceConfig, ManagedCursor cursor) {
super(subscription, serviceConfig);
this.topicName = topicName;
this.consumers = new CopyOnWriteArrayList<>();
this.partitionIndex = partitionIndex;
this.subscriptionType = subscriptionType;
+ this.cursor = cursor;
ACTIVE_CONSUMER_UPDATER.set(this, null);
}
@@ -180,7 +184,9 @@ public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBas
consumer.notifyActiveConsumerChange(currentActiveConsumer);
}
}
-
+ if (cursor != null && !cursor.isDurable() && cursor instanceof NonDurableCursorImpl) {
+ ((NonDurableCursorImpl) cursor).setReadCompacted(ACTIVE_CONSUMER_UPDATER.get(this).readCompacted());
+ }
}
public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceException {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
index 69e9a95b3ae..a5a35155419 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
@@ -51,7 +51,7 @@ public final class NonPersistentDispatcherSingleActiveConsumer extends AbstractD
public NonPersistentDispatcherSingleActiveConsumer(SubType subscriptionType, int partitionIndex,
NonPersistentTopic topic, Subscription subscription) {
super(subscriptionType, partitionIndex, topic.getName(), subscription,
- topic.getBrokerService().pulsar().getConfiguration());
+ topic.getBrokerService().pulsar().getConfiguration(), null);
this.topic = topic;
this.subscription = subscription;
this.msgDrop = new Rate();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 00b549d31c6..47a9cf22f14 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -83,7 +83,7 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp
public PersistentDispatcherSingleActiveConsumer(ManagedCursor cursor, SubType subscriptionType, int partitionIndex,
PersistentTopic topic, Subscription subscription) {
super(subscriptionType, partitionIndex, topic.getName(), subscription,
- topic.getBrokerService().pulsar().getConfiguration());
+ topic.getBrokerService().pulsar().getConfiguration(), cursor);
this.topic = topic;
this.name = topic.getName() + " / " + (cursor.getName() != null ? Codec.decode(cursor.getName())
: ""/* NonDurableCursor doesn't have name */);