You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by yu...@apache.org on 2023/08/29 08:03:42 UTC
[pulsar] 01/02: [fix] [broker] Producer is blocked on creation because backlog exceeded on topic, when dedup is enabled and no producer is there (#20951)
This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit f68589ea280eea63ee786236d2c2023b7001ecb6
Author: Heesung Sohn <10...@users.noreply.github.com>
AuthorDate: Wed Aug 23 18:03:29 2023 -0700
[fix] [broker] Producer is blocked on creation because backlog exceeded on topic, when dedup is enabled and no producer is there (#20951)
(cherry picked from commit 30073dbac0e941869b43e090d2682935e8f094e5)
---
.../pulsar/broker/service/BacklogQuotaManager.java | 28 ++++++++++++++-
.../service/persistent/MessageDeduplication.java | 11 ++++++
.../broker/service/persistent/PersistentTopic.java | 4 +++
.../broker/service/BacklogQuotaManagerTest.java | 40 ++++++++++++++++++----
4 files changed, 76 insertions(+), 7 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
index bc2541c802e..6ad1697adfc 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
@@ -103,7 +103,10 @@ public class BacklogQuotaManager {
break;
case producer_exception:
case producer_request_hold:
- disconnectProducers(persistentTopic);
+ if (!advanceSlowestSystemCursor(persistentTopic)) {
+ // The slowest is not a system cursor. Disconnecting producers to put backpressure.
+ disconnectProducers(persistentTopic);
+ }
break;
default:
break;
@@ -268,4 +271,27 @@ public class BacklogQuotaManager {
});
}
+
+ /**
+ * Advances the slowest cursor if that is a system cursor.
+ *
+ * @param persistentTopic
+ * @return true if the slowest cursor is a system cursor
+ */
+ private boolean advanceSlowestSystemCursor(PersistentTopic persistentTopic) {
+
+ ManagedLedgerImpl mLedger = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
+ ManagedCursor slowestConsumer = mLedger.getSlowestConsumer();
+ if (slowestConsumer == null) {
+ return false;
+ }
+
+ if (PersistentTopic.isDedupCursorName(slowestConsumer.getName())) {
+ persistentTopic.getMessageDeduplication().takeSnapshot();
+ return true;
+ }
+
+ // We may need to check other system cursors here : replicator, compaction
+ return false;
+ }
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
index 490be4a8876..d1b4b74945f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
@@ -27,6 +27,7 @@ import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback;
@@ -130,6 +131,9 @@ public class MessageDeduplication {
private final String replicatorPrefix;
+
+ private final AtomicBoolean snapshotTaking = new AtomicBoolean(false);
+
public MessageDeduplication(PulsarService pulsar, PersistentTopic topic, ManagedLedger managedLedger) {
this.pulsar = pulsar;
this.topic = topic;
@@ -406,6 +410,11 @@ public class MessageDeduplication {
if (log.isDebugEnabled()) {
log.debug("[{}] Taking snapshot of sequence ids map", topic.getName());
}
+
+ if (!snapshotTaking.compareAndSet(false, true)) {
+ return;
+ }
+
Map<String, Long> snapshot = new TreeMap<>();
highestSequencedPersisted.forEach((producerName, sequenceId) -> {
if (snapshot.size() < maxNumberOfProducers) {
@@ -420,11 +429,13 @@ public class MessageDeduplication {
log.debug("[{}] Stored new deduplication snapshot at {}", topic.getName(), position);
}
lastSnapshotTimestamp = System.currentTimeMillis();
+ snapshotTaking.set(false);
}
@Override
public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
log.warn("[{}] Failed to store new deduplication snapshot at {}", topic.getName(), position);
+ snapshotTaking.set(false);
}
}, null);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 593f741ce39..ffb6828c8c3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -194,6 +194,10 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
private final TopicName shadowSourceTopic;
static final String DEDUPLICATION_CURSOR_NAME = "pulsar.dedup";
+
+ public static boolean isDedupCursorName(String name) {
+ return DEDUPLICATION_CURSOR_NAME.equals(name);
+ }
private static final String TOPIC_EPOCH_PROPERTY_NAME = "pulsar.topic.epoch";
private static final double MESSAGE_EXPIRY_THRESHOLD = 1.5;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
index f3463ee121d..0ac5fdaef15 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
@@ -127,8 +127,8 @@ public class BacklogQuotaManagerTest {
config.setManagedLedgerMaxEntriesPerLedger(MAX_ENTRIES_PER_LEDGER);
config.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
config.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
- config.setSystemTopicEnabled(false);
- config.setTopicLevelPoliciesEnabled(false);
+ config.setSystemTopicEnabled(true);
+ config.setTopicLevelPoliciesEnabled(true);
config.setForceDeleteNamespaceAllowed(true);
pulsar = new PulsarService(config);
@@ -1169,8 +1169,13 @@ public class BacklogQuotaManagerTest {
assertTrue(gotException, "backlog exceeded exception did not occur");
}
- @Test
- public void testProducerExceptionAndThenUnblockSizeQuota() throws Exception {
+ @DataProvider(name = "dedupTestSet")
+ public static Object[][] dedupTestSet() {
+ return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
+ }
+
+ @Test(dataProvider = "dedupTestSet")
+ public void testProducerExceptionAndThenUnblockSizeQuota(boolean dedupTestSet) throws Exception {
assertEquals(admin.namespaces().getBacklogQuotaMap("prop/quotahold"),
new HashMap<>());
admin.namespaces().setBacklogQuota("prop/quotahold",
@@ -1186,9 +1191,12 @@ public class BacklogQuotaManagerTest {
boolean gotException = false;
Consumer<byte[]> consumer = client.newConsumer().topic(topic1).subscriptionName(subName1).subscribe();
-
byte[] content = new byte[1024];
Producer<byte[]> producer = createProducer(client, topic1);
+
+ admin.topicPolicies().setDeduplicationStatus(topic1, dedupTestSet);
+ Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA + 1) * 1000);
+
for (int i = 0; i < 10; i++) {
producer.send(content);
}
@@ -1207,6 +1215,7 @@ public class BacklogQuotaManagerTest {
}
assertTrue(gotException, "backlog exceeded exception did not occur");
+ assertFalse(producer.isConnected());
// now remove backlog and ensure that producer is unblocked;
TopicStats stats = getTopicStats(topic1);
@@ -1223,14 +1232,33 @@ public class BacklogQuotaManagerTest {
Exception sendException = null;
gotException = false;
try {
- for (int i = 0; i < 5; i++) {
+ for (int i = 0; i < 10; i++) {
producer.send(content);
+ Message<?> msg = consumer.receive();
+ consumer.acknowledge(msg);
}
} catch (Exception e) {
gotException = true;
sendException = e;
}
+ Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA + 1) * 1000);
assertFalse(gotException, "unable to publish due to " + sendException);
+
+ gotException = false;
+ long lastDisconnectedTimestamp = producer.getLastDisconnectedTimestamp();
+ try {
+ // try to send over backlog quota and make sure it passes
+ producer.send(content);
+ producer.send(content);
+ } catch (PulsarClientException ce) {
+ assertTrue(ce instanceof PulsarClientException.ProducerBlockedQuotaExceededException
+ || ce instanceof PulsarClientException.TimeoutException, ce.getMessage());
+ gotException = true;
+ sendException = ce;
+ }
+ assertFalse(gotException, "unable to publish due to " + sendException);
+ assertEquals(lastDisconnectedTimestamp, producer.getLastDisconnectedTimestamp());
+
}
@Test