You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2024/04/09 12:08:46 UTC

(pulsar) 02/02: [fix][broker] Fix consumer stops receiving messages when with large backlogs processing (#22454)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 4d8a1d8bc6a2d88d0fa82f390ae57bf4c5541d54
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Mon Apr 8 18:22:05 2024 +0800

    [fix][broker] Fix consumer stops receiving messages when with large backlogs processing (#22454)
    
    (cherry picked from commit 57a616eaa79096af5b49db89c99cd39ccc94ec00)
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |  5 ++
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  9 +---
 .../service/persistent/PersistentSubscription.java |  4 +-
 .../service/persistent/PersistentTopicTest.java    | 56 +++++++++++++++++++++-
 4 files changed, 65 insertions(+), 9 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index b104cff8aeb..66179b07c93 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -980,6 +980,11 @@ public class ManagedCursorImpl implements ManagedCursor {
                 log.debug("[{}] [{}] Re-trying the read at position {}", ledger.getName(), name, op.readPosition);
             }
 
+            if (isClosed()) {
+                callback.readEntriesFailed(new CursorAlreadyClosedException("Cursor was already closed"), ctx);
+                return;
+            }
+
             if (!hasMoreEntries()) {
                 if (log.isDebugEnabled()) {
                     log.debug("[{}] [{}] Still no entries available. Register for notification", ledger.getName(),
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 0729f7cb664..8ae57f1c165 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1031,6 +1031,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
                     + consumerName), ctx);
             return;
         } else if (!cursor.isDurable()) {
+            cursor.setState(ManagedCursorImpl.State.Closed);
             cursors.removeCursor(consumerName);
             deactivateCursorByName(consumerName);
             callback.deleteCursorComplete(ctx);
@@ -3750,13 +3751,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
     }
 
     public void addWaitingCursor(ManagedCursorImpl cursor) {
-        if (cursor instanceof NonDurableCursorImpl) {
-            if (cursor.isActive()) {
-                this.waitingCursors.add(cursor);
-            }
-        } else {
-            this.waitingCursors.add(cursor);
-        }
+        this.waitingCursors.add(cursor);
     }
 
     public boolean isCursorActive(ManagedCursor cursor) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index f00c95f7e68..8a481e32c33 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -306,7 +306,6 @@ public class PersistentSubscription extends AbstractSubscription implements Subs
 
         if (dispatcher != null && dispatcher.getConsumers().isEmpty()) {
             deactivateCursor();
-            topic.getManagedLedger().removeWaitingCursor(cursor);
 
             if (!cursor.isDurable()) {
                 // If cursor is not durable, we need to clean up the subscription as well
@@ -335,11 +334,14 @@ public class PersistentSubscription extends AbstractSubscription implements Subs
                     if (!isResetCursor) {
                         try {
                             topic.getManagedLedger().deleteCursor(cursor.getName());
+                            topic.getManagedLedger().removeWaitingCursor(cursor);
                         } catch (InterruptedException | ManagedLedgerException e) {
                             log.warn("[{}] [{}] Failed to remove non durable cursor", topic.getName(), subName, e);
                         }
                     }
                 });
+            } else {
+                topic.getManagedLedger().removeWaitingCursor(cursor);
             }
         }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
index 618fe8006a6..4c0d8eb6a49 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
@@ -79,6 +79,7 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionMode;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.naming.NamespaceBundle;
@@ -112,6 +113,11 @@ public class PersistentTopicTest extends BrokerTestBase {
         super.internalCleanup();
     }
 
+    @Override protected void doInitConf() throws Exception {
+        super.doInitConf();
+        this.conf.setManagedLedgerCursorBackloggedThreshold(10);
+    }
+
     /**
      * Test validates that broker cleans up topic which failed to unload while bundle unloading.
      *
@@ -680,7 +686,7 @@ public class PersistentTopicTest extends BrokerTestBase {
         ManagedLedgerImpl ledger = (ManagedLedgerImpl)persistentTopic.getManagedLedger();
         final ManagedCursor spyCursor= spy(ledger.newNonDurableCursor(PositionImpl.LATEST, "sub-2"));
         doAnswer((invocation) -> {
-            Thread.sleep(10_000);
+            Thread.sleep(5_000);
             invocation.callRealMethod();
             return null;
         }).when(spyCursor).asyncReadEntriesOrWait(any(int.class), any(long.class),
@@ -707,4 +713,52 @@ public class PersistentTopicTest extends BrokerTestBase {
                     assertEquals(ledger.getWaitingCursorsCount(), 0);
         });
     }
+
+    @Test
+    public void testAddWaitingCursorsForNonDurable2() throws Exception {
+        final String ns = "prop/ns-test";
+        admin.namespaces().createNamespace(ns, 2);
+        final String topicName = "persistent://prop/ns-test/testAddWaitingCursors2";
+        admin.topics().createNonPartitionedTopic(topicName);
+        pulsarClient.newConsumer(Schema.STRING).topic(topicName)
+                .subscriptionMode(SubscriptionMode.Durable)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscriptionName("sub-1").subscribe().close();
+        @Cleanup
+        final Producer<String> producer = pulsarClient.newProducer(Schema.STRING).enableBatching(false).topic(topicName).create();
+        for (int i = 0; i < 100; i ++) {
+            producer.sendAsync("test-" + i);
+        }
+        @Cleanup
+        final Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topicName)
+                .subscriptionMode(SubscriptionMode.NonDurable)
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscriptionType(SubscriptionType.Exclusive)
+                .subscriptionName("sub-2").subscribe();
+        int count = 0;
+        while(true) {
+            final Message<String> msg = consumer.receive(3, TimeUnit.SECONDS);
+            if (msg != null) {
+                consumer.acknowledge(msg);
+                count++;
+            } else {
+                break;
+            }
+        }
+        Assert.assertEquals(count, 100);
+        Thread.sleep(3_000);
+        for (int i = 0; i < 100; i ++) {
+            producer.sendAsync("test-" + i);
+        }
+        while(true) {
+            final Message<String> msg = consumer.receive(5, TimeUnit.SECONDS);
+            if (msg != null) {
+                consumer.acknowledge(msg);
+                count++;
+            } else {
+                break;
+            }
+        }
+        Assert.assertEquals(count, 200);
+    }
 }