You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2024/04/08 10:37:34 UTC
(pulsar) 01/02: [fix][broker] Fix consumer stops receiving messages when with large backlogs processing (#22454)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 7259fd0b26a246340262242a792dd57b35af2970
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Mon Apr 8 18:22:05 2024 +0800
[fix][broker] Fix consumer stops receiving messages when with large backlogs processing (#22454)
---
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 5 ++
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 9 +---
.../service/persistent/PersistentSubscription.java | 4 +-
.../service/persistent/PersistentTopicTest.java | 56 +++++++++++++++++++++-
4 files changed, 65 insertions(+), 9 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index dbdde7cf707..9bbcda327f0 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -985,6 +985,11 @@ public class ManagedCursorImpl implements ManagedCursor {
log.debug("[{}] [{}] Re-trying the read at position {}", ledger.getName(), name, op.readPosition);
}
+ if (isClosed()) {
+ callback.readEntriesFailed(new CursorAlreadyClosedException("Cursor was already closed"), ctx);
+ return;
+ }
+
if (!hasMoreEntries()) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Still no entries available. Register for notification", ledger.getName(),
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 3d66bc8d6c0..426ac8df218 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1032,6 +1032,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
+ consumerName), ctx);
return;
} else if (!cursor.isDurable()) {
+ cursor.setState(ManagedCursorImpl.State.Closed);
cursors.removeCursor(consumerName);
deactivateCursorByName(consumerName);
callback.deleteCursorComplete(ctx);
@@ -3804,13 +3805,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
}
public void addWaitingCursor(ManagedCursorImpl cursor) {
- if (cursor instanceof NonDurableCursorImpl) {
- if (cursor.isActive()) {
- this.waitingCursors.add(cursor);
- }
- } else {
- this.waitingCursors.add(cursor);
- }
+ this.waitingCursors.add(cursor);
}
public boolean isCursorActive(ManagedCursor cursor) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 3bedf497f87..fdb977aa366 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -308,7 +308,6 @@ public class PersistentSubscription extends AbstractSubscription implements Subs
if (dispatcher != null && dispatcher.getConsumers().isEmpty()) {
deactivateCursor();
- topic.getManagedLedger().removeWaitingCursor(cursor);
if (!cursor.isDurable()) {
// If cursor is not durable, we need to clean up the subscription as well. No need to check for active
@@ -338,11 +337,14 @@ public class PersistentSubscription extends AbstractSubscription implements Subs
if (!isResetCursor) {
try {
topic.getManagedLedger().deleteCursor(cursor.getName());
+ topic.getManagedLedger().removeWaitingCursor(cursor);
} catch (InterruptedException | ManagedLedgerException e) {
log.warn("[{}] [{}] Failed to remove non durable cursor", topic.getName(), subName, e);
}
}
});
+ } else {
+ topic.getManagedLedger().removeWaitingCursor(cursor);
}
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
index 618fe8006a6..4c0d8eb6a49 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
@@ -79,6 +79,7 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.NamespaceBundle;
@@ -112,6 +113,11 @@ public class PersistentTopicTest extends BrokerTestBase {
super.internalCleanup();
}
+ @Override protected void doInitConf() throws Exception {
+ super.doInitConf();
+ this.conf.setManagedLedgerCursorBackloggedThreshold(10);
+ }
+
/**
* Test validates that broker cleans up topic which failed to unload while bundle unloading.
*
@@ -680,7 +686,7 @@ public class PersistentTopicTest extends BrokerTestBase {
ManagedLedgerImpl ledger = (ManagedLedgerImpl)persistentTopic.getManagedLedger();
final ManagedCursor spyCursor= spy(ledger.newNonDurableCursor(PositionImpl.LATEST, "sub-2"));
doAnswer((invocation) -> {
- Thread.sleep(10_000);
+ Thread.sleep(5_000);
invocation.callRealMethod();
return null;
}).when(spyCursor).asyncReadEntriesOrWait(any(int.class), any(long.class),
@@ -707,4 +713,52 @@ public class PersistentTopicTest extends BrokerTestBase {
assertEquals(ledger.getWaitingCursorsCount(), 0);
});
}
+
+ @Test
+ public void testAddWaitingCursorsForNonDurable2() throws Exception {
+ final String ns = "prop/ns-test";
+ admin.namespaces().createNamespace(ns, 2);
+ final String topicName = "persistent://prop/ns-test/testAddWaitingCursors2";
+ admin.topics().createNonPartitionedTopic(topicName);
+ pulsarClient.newConsumer(Schema.STRING).topic(topicName)
+ .subscriptionMode(SubscriptionMode.Durable)
+ .subscriptionType(SubscriptionType.Shared)
+ .subscriptionName("sub-1").subscribe().close();
+ @Cleanup
+ final Producer<String> producer = pulsarClient.newProducer(Schema.STRING).enableBatching(false).topic(topicName).create();
+ for (int i = 0; i < 100; i ++) {
+ producer.sendAsync("test-" + i);
+ }
+ @Cleanup
+ final Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topicName)
+ .subscriptionMode(SubscriptionMode.NonDurable)
+ .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscriptionType(SubscriptionType.Exclusive)
+ .subscriptionName("sub-2").subscribe();
+ int count = 0;
+ while(true) {
+ final Message<String> msg = consumer.receive(3, TimeUnit.SECONDS);
+ if (msg != null) {
+ consumer.acknowledge(msg);
+ count++;
+ } else {
+ break;
+ }
+ }
+ Assert.assertEquals(count, 100);
+ Thread.sleep(3_000);
+ for (int i = 0; i < 100; i ++) {
+ producer.sendAsync("test-" + i);
+ }
+ while(true) {
+ final Message<String> msg = consumer.receive(5, TimeUnit.SECONDS);
+ if (msg != null) {
+ consumer.acknowledge(msg);
+ count++;
+ } else {
+ break;
+ }
+ }
+ Assert.assertEquals(count, 200);
+ }
}