You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2024/04/09 12:08:44 UTC

(pulsar) branch branch-3.1 updated (c9b656ca8c4 -> 4d8a1d8bc6a)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a change to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git


    from c9b656ca8c4 [improve][misc] Specify valid home dir for the default user in the Ubuntu based docker image (#22446)
     new afe7bd5a6f9 [fix][broker] Check cursor state before adding it to the `waitingCursors` (#22191)
     new 4d8a1d8bc6a [fix][broker] Fix consumer stops receiving messages when with large backlogs processing (#22454)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |   7 +-
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |   5 ++
 .../service/persistent/PersistentSubscription.java |   4 +-
 .../service/persistent/PersistentTopicTest.java    | 100 +++++++++++++++++++++
 4 files changed, 114 insertions(+), 2 deletions(-)


(pulsar) 02/02: [fix][broker] Fix consumer stops receiving messages when with large backlogs processing (#22454)

Posted by lh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 4d8a1d8bc6a2d88d0fa82f390ae57bf4c5541d54
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Mon Apr 8 18:22:05 2024 +0800

    [fix][broker] Fix consumer stops receiving messages when with large backlogs processing (#22454)
    
    (cherry picked from commit 57a616eaa79096af5b49db89c99cd39ccc94ec00)
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |  5 ++
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  9 +---
 .../service/persistent/PersistentSubscription.java |  4 +-
 .../service/persistent/PersistentTopicTest.java    | 56 +++++++++++++++++++++-
 4 files changed, 65 insertions(+), 9 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index b104cff8aeb..66179b07c93 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -980,6 +980,11 @@ public class ManagedCursorImpl implements ManagedCursor {
                 log.debug("[{}] [{}] Re-trying the read at position {}", ledger.getName(), name, op.readPosition);
             }
 
+            if (isClosed()) {
+                callback.readEntriesFailed(new CursorAlreadyClosedException("Cursor was already closed"), ctx);
+                return;
+            }
+
             if (!hasMoreEntries()) {
                 if (log.isDebugEnabled()) {
                     log.debug("[{}] [{}] Still no entries available. Register for notification", ledger.getName(),
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 0729f7cb664..8ae57f1c165 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1031,6 +1031,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
                     + consumerName), ctx);
             return;
         } else if (!cursor.isDurable()) {
+            cursor.setState(ManagedCursorImpl.State.Closed);
             cursors.removeCursor(consumerName);
             deactivateCursorByName(consumerName);
             callback.deleteCursorComplete(ctx);
@@ -3750,13 +3751,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
     }
 
     public void addWaitingCursor(ManagedCursorImpl cursor) {
-        if (cursor instanceof NonDurableCursorImpl) {
-            if (cursor.isActive()) {
-                this.waitingCursors.add(cursor);
-            }
-        } else {
-            this.waitingCursors.add(cursor);
-        }
+        this.waitingCursors.add(cursor);
     }
 
     public boolean isCursorActive(ManagedCursor cursor) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index f00c95f7e68..8a481e32c33 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -306,7 +306,6 @@ public class PersistentSubscription extends AbstractSubscription implements Subs
 
         if (dispatcher != null && dispatcher.getConsumers().isEmpty()) {
             deactivateCursor();
-            topic.getManagedLedger().removeWaitingCursor(cursor);
 
             if (!cursor.isDurable()) {
                 // If cursor is not durable, we need to clean up the subscription as well
@@ -335,11 +334,14 @@ public class PersistentSubscription extends AbstractSubscription implements Subs
                     if (!isResetCursor) {
                         try {
                             topic.getManagedLedger().deleteCursor(cursor.getName());
+                            topic.getManagedLedger().removeWaitingCursor(cursor);
                         } catch (InterruptedException | ManagedLedgerException e) {
                             log.warn("[{}] [{}] Failed to remove non durable cursor", topic.getName(), subName, e);
                         }
                     }
                 });
+            } else {
+                topic.getManagedLedger().removeWaitingCursor(cursor);
             }
         }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
index 618fe8006a6..4c0d8eb6a49 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
@@ -79,6 +79,7 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionMode;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.naming.NamespaceBundle;
@@ -112,6 +113,11 @@ public class PersistentTopicTest extends BrokerTestBase {
         super.internalCleanup();
     }
 
+    @Override protected void doInitConf() throws Exception {
+        super.doInitConf();
+        this.conf.setManagedLedgerCursorBackloggedThreshold(10);
+    }
+
     /**
      * Test validates that broker cleans up topic which failed to unload while bundle unloading.
      *
@@ -680,7 +686,7 @@ public class PersistentTopicTest extends BrokerTestBase {
         ManagedLedgerImpl ledger = (ManagedLedgerImpl)persistentTopic.getManagedLedger();
         final ManagedCursor spyCursor= spy(ledger.newNonDurableCursor(PositionImpl.LATEST, "sub-2"));
         doAnswer((invocation) -> {
-            Thread.sleep(10_000);
+            Thread.sleep(5_000);
             invocation.callRealMethod();
             return null;
         }).when(spyCursor).asyncReadEntriesOrWait(any(int.class), any(long.class),
@@ -707,4 +713,52 @@ public class PersistentTopicTest extends BrokerTestBase {
                     assertEquals(ledger.getWaitingCursorsCount(), 0);
         });
     }
+
+    @Test
+    public void testAddWaitingCursorsForNonDurable2() throws Exception {
+        final String ns = "prop/ns-test";
+        admin.namespaces().createNamespace(ns, 2);
+        final String topicName = "persistent://prop/ns-test/testAddWaitingCursors2";
+        admin.topics().createNonPartitionedTopic(topicName);
+        pulsarClient.newConsumer(Schema.STRING).topic(topicName)
+                .subscriptionMode(SubscriptionMode.Durable)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscriptionName("sub-1").subscribe().close();
+        @Cleanup
+        final Producer<String> producer = pulsarClient.newProducer(Schema.STRING).enableBatching(false).topic(topicName).create();
+        for (int i = 0; i < 100; i ++) {
+            producer.sendAsync("test-" + i);
+        }
+        @Cleanup
+        final Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topicName)
+                .subscriptionMode(SubscriptionMode.NonDurable)
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscriptionType(SubscriptionType.Exclusive)
+                .subscriptionName("sub-2").subscribe();
+        int count = 0;
+        while(true) {
+            final Message<String> msg = consumer.receive(3, TimeUnit.SECONDS);
+            if (msg != null) {
+                consumer.acknowledge(msg);
+                count++;
+            } else {
+                break;
+            }
+        }
+        Assert.assertEquals(count, 100);
+        Thread.sleep(3_000);
+        for (int i = 0; i < 100; i ++) {
+            producer.sendAsync("test-" + i);
+        }
+        while(true) {
+            final Message<String> msg = consumer.receive(5, TimeUnit.SECONDS);
+            if (msg != null) {
+                consumer.acknowledge(msg);
+                count++;
+            } else {
+                break;
+            }
+        }
+        Assert.assertEquals(count, 200);
+    }
 }


(pulsar) 01/02: [fix][broker] Check cursor state before adding it to the `waitingCursors` (#22191)

Posted by lh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit afe7bd5a6f9a50416498e385b449354e808f60f7
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Thu Mar 28 06:53:21 2024 +0800

    [fix][broker] Check cursor state before adding it to the `waitingCursors` (#22191)
    
    (cherry picked from commit b702d440dc5e5a4cfd845bf60d5e310efe665ff5)
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |  2 +-
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 10 +++++
 .../service/persistent/PersistentTopicTest.java    | 46 ++++++++++++++++++++++
 3 files changed, 57 insertions(+), 1 deletion(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index f67f534f86d..b104cff8aeb 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -986,7 +986,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                             name);
                 }
                 // Let the managed ledger know we want to be notified whenever a new entry is published
-                ledger.waitingCursors.add(this);
+                ledger.addWaitingCursor(this);
             } else {
                 if (log.isDebugEnabled()) {
                     log.debug("[{}] [{}] Skip notification registering since we do have entries available",
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 72e0b2c9d06..0729f7cb664 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -3749,6 +3749,16 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         this.waitingCursors.remove(cursor);
     }
 
+    public void addWaitingCursor(ManagedCursorImpl cursor) {
+        if (cursor instanceof NonDurableCursorImpl) {
+            if (cursor.isActive()) {
+                this.waitingCursors.add(cursor);
+            }
+        } else {
+            this.waitingCursors.add(cursor);
+        }
+    }
+
     public boolean isCursorActive(ManagedCursor cursor) {
         return activeCursors.get(cursor.getName()) != null;
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
index 717dfc28ac8..618fe8006a6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
@@ -57,10 +57,15 @@ import java.util.function.Supplier;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorContainer;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.TopicPoliciesService;
 import org.apache.pulsar.broker.stats.PrometheusMetricsTest;
 import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
@@ -74,6 +79,7 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionMode;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.TopicName;
@@ -661,4 +667,44 @@ public class PersistentTopicTest extends BrokerTestBase {
         subscribe.close();
         admin.topics().delete(topicName);
     }
+
+    @Test
+    public void testAddWaitingCursorsForNonDurable() throws Exception {
+        final String ns = "prop/ns-test";
+        admin.namespaces().createNamespace(ns, 2);
+        final String topicName = "persistent://prop/ns-test/testAddWaitingCursors";
+        admin.topics().createNonPartitionedTopic(topicName);
+        final Optional<Topic> topic = pulsar.getBrokerService().getTopic(topicName, false).join();
+        assertNotNull(topic.get());
+        PersistentTopic persistentTopic = (PersistentTopic) topic.get();
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl)persistentTopic.getManagedLedger();
+        final ManagedCursor spyCursor= spy(ledger.newNonDurableCursor(PositionImpl.LATEST, "sub-2"));
+        doAnswer((invocation) -> {
+            Thread.sleep(10_000);
+            invocation.callRealMethod();
+            return null;
+        }).when(spyCursor).asyncReadEntriesOrWait(any(int.class), any(long.class),
+                any(AsyncCallbacks.ReadEntriesCallback.class), any(Object.class), any(PositionImpl.class));
+        Field cursorField = ManagedLedgerImpl.class.getDeclaredField("cursors");
+        cursorField.setAccessible(true);
+        ManagedCursorContainer container = (ManagedCursorContainer) cursorField.get(ledger);
+        container.removeCursor("sub-2");
+        container.add(spyCursor, null);
+        final Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topicName)
+                .subscriptionMode(SubscriptionMode.NonDurable)
+                .subscriptionType(SubscriptionType.Exclusive)
+                .subscriptionName("sub-2").subscribe();
+        final Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topicName).create();
+        producer.send("test");
+        producer.close();
+        final Message<String> receive = consumer.receive();
+        assertEquals("test", receive.getValue());
+        consumer.close();
+        Awaitility.await()
+                .pollDelay(5, TimeUnit.SECONDS)
+                .pollInterval(1, TimeUnit.SECONDS)
+                .untilAsserted(() -> {
+                    assertEquals(ledger.getWaitingCursorsCount(), 0);
+        });
+    }
 }