You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by yu...@apache.org on 2023/08/29 08:03:42 UTC

[pulsar] 01/02: [fix] [broker] Producer is blocked on creation because backlog exceeded on topic, when dedup is enabled and no producer is there (#20951)

This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit f68589ea280eea63ee786236d2c2023b7001ecb6
Author: Heesung Sohn <10...@users.noreply.github.com>
AuthorDate: Wed Aug 23 18:03:29 2023 -0700

    [fix] [broker] Producer is blocked on creation because backlog exceeded on topic, when dedup is enabled and no producer is there (#20951)
    
    (cherry picked from commit 30073dbac0e941869b43e090d2682935e8f094e5)
---
 .../pulsar/broker/service/BacklogQuotaManager.java | 28 ++++++++++++++-
 .../service/persistent/MessageDeduplication.java   | 11 ++++++
 .../broker/service/persistent/PersistentTopic.java |  4 +++
 .../broker/service/BacklogQuotaManagerTest.java    | 40 ++++++++++++++++++----
 4 files changed, 76 insertions(+), 7 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
index bc2541c802e..6ad1697adfc 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
@@ -103,7 +103,10 @@ public class BacklogQuotaManager {
             break;
         case producer_exception:
         case producer_request_hold:
-            disconnectProducers(persistentTopic);
+            if (!advanceSlowestSystemCursor(persistentTopic)) {
+                // The slowest is not a system cursor. Disconnecting producers to put backpressure.
+                disconnectProducers(persistentTopic);
+            }
             break;
         default:
             break;
@@ -268,4 +271,27 @@ public class BacklogQuotaManager {
 
         });
     }
+
+    /**
+     * Advances the slowest cursor if that is a system cursor.
+     *
+     * @param persistentTopic
+     * @return true if the slowest cursor is a system cursor
+     */
+    private boolean advanceSlowestSystemCursor(PersistentTopic persistentTopic) {
+
+        ManagedLedgerImpl mLedger = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
+        ManagedCursor slowestConsumer = mLedger.getSlowestConsumer();
+        if (slowestConsumer == null) {
+            return false;
+        }
+
+        if (PersistentTopic.isDedupCursorName(slowestConsumer.getName())) {
+            persistentTopic.getMessageDeduplication().takeSnapshot();
+            return true;
+        }
+
+        // We may need to check other system cursors here : replicator, compaction
+        return false;
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
index 490be4a8876..d1b4b74945f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
@@ -27,6 +27,7 @@ import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback;
@@ -130,6 +131,9 @@ public class MessageDeduplication {
 
     private final String replicatorPrefix;
 
+
+    private final AtomicBoolean snapshotTaking = new AtomicBoolean(false);
+
     public MessageDeduplication(PulsarService pulsar, PersistentTopic topic, ManagedLedger managedLedger) {
         this.pulsar = pulsar;
         this.topic = topic;
@@ -406,6 +410,11 @@ public class MessageDeduplication {
         if (log.isDebugEnabled()) {
             log.debug("[{}] Taking snapshot of sequence ids map", topic.getName());
         }
+
+        if (!snapshotTaking.compareAndSet(false, true)) {
+            return;
+        }
+
         Map<String, Long> snapshot = new TreeMap<>();
         highestSequencedPersisted.forEach((producerName, sequenceId) -> {
             if (snapshot.size() < maxNumberOfProducers) {
@@ -420,11 +429,13 @@ public class MessageDeduplication {
                     log.debug("[{}] Stored new deduplication snapshot at {}", topic.getName(), position);
                 }
                 lastSnapshotTimestamp = System.currentTimeMillis();
+                snapshotTaking.set(false);
             }
 
             @Override
             public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
                 log.warn("[{}] Failed to store new deduplication snapshot at {}", topic.getName(), position);
+                snapshotTaking.set(false);
             }
         }, null);
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 593f741ce39..ffb6828c8c3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -194,6 +194,10 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
     private final TopicName shadowSourceTopic;
 
     static final String DEDUPLICATION_CURSOR_NAME = "pulsar.dedup";
+
+    public static boolean isDedupCursorName(String name) {
+        return DEDUPLICATION_CURSOR_NAME.equals(name);
+    }
     private static final String TOPIC_EPOCH_PROPERTY_NAME = "pulsar.topic.epoch";
 
     private static final double MESSAGE_EXPIRY_THRESHOLD = 1.5;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
index f3463ee121d..0ac5fdaef15 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
@@ -127,8 +127,8 @@ public class BacklogQuotaManagerTest {
             config.setManagedLedgerMaxEntriesPerLedger(MAX_ENTRIES_PER_LEDGER);
             config.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
             config.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
-            config.setSystemTopicEnabled(false);
-            config.setTopicLevelPoliciesEnabled(false);
+            config.setSystemTopicEnabled(true);
+            config.setTopicLevelPoliciesEnabled(true);
             config.setForceDeleteNamespaceAllowed(true);
 
             pulsar = new PulsarService(config);
@@ -1169,8 +1169,13 @@ public class BacklogQuotaManagerTest {
         assertTrue(gotException, "backlog exceeded exception did not occur");
     }
 
-    @Test
-    public void testProducerExceptionAndThenUnblockSizeQuota() throws Exception {
+    @DataProvider(name = "dedupTestSet")
+    public static Object[][] dedupTestSet() {
+        return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
+    }
+
+    @Test(dataProvider = "dedupTestSet")
+    public void testProducerExceptionAndThenUnblockSizeQuota(boolean dedupTestSet) throws Exception {
         assertEquals(admin.namespaces().getBacklogQuotaMap("prop/quotahold"),
                 new HashMap<>());
         admin.namespaces().setBacklogQuota("prop/quotahold",
@@ -1186,9 +1191,12 @@ public class BacklogQuotaManagerTest {
         boolean gotException = false;
 
         Consumer<byte[]> consumer = client.newConsumer().topic(topic1).subscriptionName(subName1).subscribe();
-
         byte[] content = new byte[1024];
         Producer<byte[]> producer = createProducer(client, topic1);
+
+        admin.topicPolicies().setDeduplicationStatus(topic1, dedupTestSet);
+        Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA + 1) * 1000);
+
         for (int i = 0; i < 10; i++) {
             producer.send(content);
         }
@@ -1207,6 +1215,7 @@ public class BacklogQuotaManagerTest {
         }
 
         assertTrue(gotException, "backlog exceeded exception did not occur");
+        assertFalse(producer.isConnected());
         // now remove backlog and ensure that producer is unblocked;
 
         TopicStats stats = getTopicStats(topic1);
@@ -1223,14 +1232,33 @@ public class BacklogQuotaManagerTest {
         Exception sendException = null;
         gotException = false;
         try {
-            for (int i = 0; i < 5; i++) {
+            for (int i = 0; i < 10; i++) {
                 producer.send(content);
+                Message<?> msg = consumer.receive();
+                consumer.acknowledge(msg);
             }
         } catch (Exception e) {
             gotException = true;
             sendException = e;
         }
+        Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA + 1) * 1000);
         assertFalse(gotException, "unable to publish due to " + sendException);
+
+        gotException = false;
+        long lastDisconnectedTimestamp = producer.getLastDisconnectedTimestamp();
+        try {
+            // try to send over backlog quota and make sure it passes
+            producer.send(content);
+            producer.send(content);
+        } catch (PulsarClientException ce) {
+            assertTrue(ce instanceof PulsarClientException.ProducerBlockedQuotaExceededException
+                    || ce instanceof PulsarClientException.TimeoutException, ce.getMessage());
+            gotException = true;
+            sendException = ce;
+        }
+        assertFalse(gotException, "unable to publish due to " + sendException);
+        assertEquals(lastDisconnectedTimestamp, producer.getLastDisconnectedTimestamp());
+
     }
 
     @Test