You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by yu...@apache.org on 2024/04/14 16:13:56 UTC
(pulsar) branch master updated: [fix] [broker] Prevent long deduplication cursor backlog so that topic loading wouldn't timeout (#22479)
This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 837f8bca7dd [fix] [broker] Prevent long deduplication cursor backlog so that topic loading wouldn't timeout (#22479)
837f8bca7dd is described below
commit 837f8bca7ddbbad4354f9a89e36fcd6aea1be85c
Author: fengyubiao <yu...@streamnative.io>
AuthorDate: Mon Apr 15 00:13:49 2024 +0800
[fix] [broker] Prevent long deduplication cursor backlog so that topic loading wouldn't timeout (#22479)
---
.../pulsar/broker/service/BrokerService.java | 4 +-
.../service/persistent/MessageDeduplication.java | 18 ++-
.../broker/service/persistent/PersistentTopic.java | 2 +-
.../DeduplicationDisabledBrokerLevelTest.java | 161 +++++++++++++++++++++
4 files changed, 177 insertions(+), 8 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index b4d0f38b4a4..2687532693a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -631,8 +631,10 @@ public class BrokerService implements Closeable {
}
protected void startDeduplicationSnapshotMonitor() {
+ // We do not know whether users will enable deduplication on namespace level/topic level or not, so keep this
+ // scheduled task runs.
int interval = pulsar().getConfiguration().getBrokerDeduplicationSnapshotFrequencyInSeconds();
- if (interval > 0 && pulsar().getConfiguration().isBrokerDeduplicationEnabled()) {
+ if (interval > 0) {
this.deduplicationSnapshotMonitor = OrderedScheduler.newSchedulerBuilder()
.name("deduplication-snapshot-monitor")
.numThreads(1)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
index 802dd917961..e508661364d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
@@ -157,9 +157,14 @@ public class MessageDeduplication {
// Replay all the entries and apply all the sequence ids updates
log.info("[{}] Replaying {} entries for deduplication", topic.getName(), managedCursor.getNumberOfEntries());
- CompletableFuture<Void> future = new CompletableFuture<>();
+ CompletableFuture<Position> future = new CompletableFuture<>();
replayCursor(future);
- return future;
+ return future.thenAccept(lastPosition -> {
+ if (lastPosition != null && snapshotCounter >= snapshotInterval) {
+ snapshotCounter = 0;
+ takeSnapshot(lastPosition);
+ }
+ });
}
/**
@@ -168,11 +173,11 @@ public class MessageDeduplication {
*
* @param future future to trigger when the replay is complete
*/
- private void replayCursor(CompletableFuture<Void> future) {
+ private void replayCursor(CompletableFuture<Position> future) {
managedCursor.asyncReadEntries(100, new ReadEntriesCallback() {
@Override
public void readEntriesComplete(List<Entry> entries, Object ctx) {
-
+ Position lastPosition = null;
for (Entry entry : entries) {
ByteBuf messageMetadataAndPayload = entry.getDataBuffer();
MessageMetadata md = Commands.parseMessageMetadata(messageMetadataAndPayload);
@@ -182,7 +187,8 @@ public class MessageDeduplication {
highestSequencedPushed.put(producerName, sequenceId);
highestSequencedPersisted.put(producerName, sequenceId);
producerRemoved(producerName);
-
+ snapshotCounter++;
+ lastPosition = entry.getPosition();
entry.release();
}
@@ -191,7 +197,7 @@ public class MessageDeduplication {
pulsar.getExecutor().execute(() -> replayCursor(future));
} else {
// Done replaying
- future.complete(null);
+ future.complete(lastPosition);
}
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 3c9ab04d79a..e4441969101 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -208,7 +208,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
private volatile List<String> shadowTopics;
private final TopicName shadowSourceTopic;
- static final String DEDUPLICATION_CURSOR_NAME = "pulsar.dedup";
+ public static final String DEDUPLICATION_CURSOR_NAME = "pulsar.dedup";
public static boolean isDedupCursorName(String name) {
return DEDUPLICATION_CURSOR_NAME.equals(name);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/DeduplicationDisabledBrokerLevelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/DeduplicationDisabledBrokerLevelTest.java
new file mode 100644
index 00000000000..2ce4ea9b00b
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/DeduplicationDisabledBrokerLevelTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.service.persistent.MessageDeduplication;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.Schema;
+import org.awaitility.Awaitility;
+import org.awaitility.reflect.WhiteboxImpl;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class DeduplicationDisabledBrokerLevelTest extends ProducerConsumerBase {
+
+ private int deduplicationSnapshotFrequency = 5;
+ private int brokerDeduplicationEntriesInterval = 1000;
+
+ @BeforeClass
+ @Override
+ protected void setup() throws Exception {
+ super.internalSetup();
+ producerBaseSetup();
+ }
+
+ @AfterClass(alwaysRun = true)
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ protected void doInitConf() throws Exception {
+ this.conf.setBrokerDeduplicationEnabled(false);
+ this.conf.setBrokerDeduplicationSnapshotFrequencyInSeconds(deduplicationSnapshotFrequency);
+ this.conf.setBrokerDeduplicationEntriesInterval(brokerDeduplicationEntriesInterval);
+ }
+
+ @Test
+ public void testNoBacklogOnDeduplication() throws Exception {
+ final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+ admin.topics().createNonPartitionedTopic(topic);
+ final PersistentTopic persistentTopic =
+ (PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).join().get();
+ final ManagedLedgerImpl ml = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
+ // deduplication enabled:
+ // broker level: "false"
+ // topic level: "true".
+ // So it is enabled.
+ admin.topicPolicies().setDeduplicationStatus(topic, true);
+ Awaitility.await().untilAsserted(() -> {
+ ManagedCursorImpl cursor =
+ (ManagedCursorImpl) ml.getCursors().get(PersistentTopic.DEDUPLICATION_CURSOR_NAME);
+ assertNotNull(cursor);
+ });
+
+ // Verify: regarding deduplication cursor, messages will be acknowledged automatically.
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();
+ producer.send("1");
+ producer.send("2");
+ producer.send("3");
+ producer.close();
+ ManagedCursorImpl cursor = (ManagedCursorImpl) ml.getCursors().get(PersistentTopic.DEDUPLICATION_CURSOR_NAME);
+ Awaitility.await().atMost(Duration.ofSeconds(deduplicationSnapshotFrequency * 3)).untilAsserted(() -> {
+ PositionImpl LAC = (PositionImpl) ml.getLastConfirmedEntry();
+ PositionImpl cursorMD = (PositionImpl) cursor.getMarkDeletedPosition();
+ assertTrue(LAC.compareTo(cursorMD) <= 0);
+ });
+
+ // cleanup.
+ admin.topics().delete(topic);
+ }
+
+ @Test
+ public void testSnapshotCounterAfterUnload() throws Exception {
+ final int originalDeduplicationSnapshotFrequency = deduplicationSnapshotFrequency;
+ deduplicationSnapshotFrequency = 3600;
+ cleanup();
+ setup();
+
+ // Create a topic and wait deduplication is started.
+ final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+ admin.topics().createNonPartitionedTopic(topic);
+ final PersistentTopic persistentTopic1 =
+ (PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).join().get();
+ final ManagedLedgerImpl ml1 = (ManagedLedgerImpl) persistentTopic1.getManagedLedger();
+ admin.topicPolicies().setDeduplicationStatus(topic, true);
+ Awaitility.await().untilAsserted(() -> {
+ ManagedCursorImpl cursor1 =
+ (ManagedCursorImpl) ml1.getCursors().get(PersistentTopic.DEDUPLICATION_CURSOR_NAME);
+ assertNotNull(cursor1);
+ });
+ final MessageDeduplication deduplication1 = persistentTopic1.getMessageDeduplication();
+
+ // 1. Send 999 messages, it is less than "brokerDeduplicationEntriesIntervaddl".
+ // 2. Unload topic.
+ // 3. Send 1 messages, there are 1099 messages have not been snapshot now.
+ // 4. Verify the snapshot has been taken.
+ // step 1.
+ final Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();
+ for (int i = 0; i < brokerDeduplicationEntriesInterval - 1; i++) {
+ producer.send(i + "");
+ }
+ int snapshotCounter1 = WhiteboxImpl.getInternalState(deduplication1, "snapshotCounter");
+ assertEquals(snapshotCounter1, brokerDeduplicationEntriesInterval - 1);
+ admin.topics().unload(topic);
+ PersistentTopic persistentTopic2 =
+ (PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).join().get();
+ ManagedLedgerImpl ml2 = (ManagedLedgerImpl) persistentTopic2.getManagedLedger();
+ MessageDeduplication deduplication2 = persistentTopic2.getMessageDeduplication();
+ admin.topicPolicies().setDeduplicationStatus(topic, true);
+ Awaitility.await().untilAsserted(() -> {
+ ManagedCursorImpl cursor =
+ (ManagedCursorImpl) ml2.getCursors().get(PersistentTopic.DEDUPLICATION_CURSOR_NAME);
+ assertNotNull(cursor);
+ });
+ // step 3.
+ producer.send("last message");
+ ml2.trimConsumedLedgersInBackground(new CompletableFuture<>());
+ // step 4.
+ Awaitility.await().untilAsserted(() -> {
+ int snapshotCounter3 = WhiteboxImpl.getInternalState(deduplication2, "snapshotCounter");
+ assertTrue(snapshotCounter3 < brokerDeduplicationEntriesInterval);
+ // Verify: the previous ledger will be removed because all messages have been acked.
+ assertEquals(ml2.getLedgersInfo().size(), 1);
+ });
+
+ // cleanup.
+ producer.close();
+ admin.topics().delete(topic);
+ deduplicationSnapshotFrequency = originalDeduplicationSnapshotFrequency;
+ cleanup();
+ setup();
+ }
+}