You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by yu...@apache.org on 2024/04/14 16:13:56 UTC

(pulsar) branch master updated: [fix] [broker] Prevent long deduplication cursor backlog so that topic loading wouldn't timeout (#22479)

This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 837f8bca7dd [fix] [broker] Prevent long deduplication cursor backlog so that topic loading wouldn't timeout (#22479)
837f8bca7dd is described below

commit 837f8bca7ddbbad4354f9a89e36fcd6aea1be85c
Author: fengyubiao <yu...@streamnative.io>
AuthorDate: Mon Apr 15 00:13:49 2024 +0800

    [fix] [broker] Prevent long deduplication cursor backlog so that topic loading wouldn't timeout (#22479)
---
 .../pulsar/broker/service/BrokerService.java       |   4 +-
 .../service/persistent/MessageDeduplication.java   |  18 ++-
 .../broker/service/persistent/PersistentTopic.java |   2 +-
 .../DeduplicationDisabledBrokerLevelTest.java      | 161 +++++++++++++++++++++
 4 files changed, 177 insertions(+), 8 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index b4d0f38b4a4..2687532693a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -631,8 +631,10 @@ public class BrokerService implements Closeable {
     }
 
     protected void startDeduplicationSnapshotMonitor() {
+        // We do not know whether users will enable deduplication on namespace level/topic level or not, so keep this
+        // scheduled task runs.
         int interval = pulsar().getConfiguration().getBrokerDeduplicationSnapshotFrequencyInSeconds();
-        if (interval > 0 && pulsar().getConfiguration().isBrokerDeduplicationEnabled()) {
+        if (interval > 0) {
             this.deduplicationSnapshotMonitor = OrderedScheduler.newSchedulerBuilder()
                     .name("deduplication-snapshot-monitor")
                     .numThreads(1)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
index 802dd917961..e508661364d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
@@ -157,9 +157,14 @@ public class MessageDeduplication {
 
         // Replay all the entries and apply all the sequence ids updates
         log.info("[{}] Replaying {} entries for deduplication", topic.getName(), managedCursor.getNumberOfEntries());
-        CompletableFuture<Void> future = new CompletableFuture<>();
+        CompletableFuture<Position> future = new CompletableFuture<>();
         replayCursor(future);
-        return future;
+        return future.thenAccept(lastPosition -> {
+            if (lastPosition != null && snapshotCounter >= snapshotInterval) {
+                snapshotCounter = 0;
+                takeSnapshot(lastPosition);
+            }
+        });
     }
 
     /**
@@ -168,11 +173,11 @@ public class MessageDeduplication {
      *
      * @param future future to trigger when the replay is complete
      */
-    private void replayCursor(CompletableFuture<Void> future) {
+    private void replayCursor(CompletableFuture<Position> future) {
         managedCursor.asyncReadEntries(100, new ReadEntriesCallback() {
             @Override
             public void readEntriesComplete(List<Entry> entries, Object ctx) {
-
+                Position lastPosition = null;
                 for (Entry entry : entries) {
                     ByteBuf messageMetadataAndPayload = entry.getDataBuffer();
                     MessageMetadata md = Commands.parseMessageMetadata(messageMetadataAndPayload);
@@ -182,7 +187,8 @@ public class MessageDeduplication {
                     highestSequencedPushed.put(producerName, sequenceId);
                     highestSequencedPersisted.put(producerName, sequenceId);
                     producerRemoved(producerName);
-
+                    snapshotCounter++;
+                    lastPosition = entry.getPosition();
                     entry.release();
                 }
 
@@ -191,7 +197,7 @@ public class MessageDeduplication {
                     pulsar.getExecutor().execute(() -> replayCursor(future));
                 } else {
                     // Done replaying
-                    future.complete(null);
+                    future.complete(lastPosition);
                 }
             }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 3c9ab04d79a..e4441969101 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -208,7 +208,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
     private volatile List<String> shadowTopics;
     private final TopicName shadowSourceTopic;
 
-    static final String DEDUPLICATION_CURSOR_NAME = "pulsar.dedup";
+    public static final String DEDUPLICATION_CURSOR_NAME = "pulsar.dedup";
 
     public static boolean isDedupCursorName(String name) {
         return DEDUPLICATION_CURSOR_NAME.equals(name);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/DeduplicationDisabledBrokerLevelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/DeduplicationDisabledBrokerLevelTest.java
new file mode 100644
index 00000000000..2ce4ea9b00b
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/DeduplicationDisabledBrokerLevelTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.service.persistent.MessageDeduplication;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.Schema;
+import org.awaitility.Awaitility;
+import org.awaitility.reflect.WhiteboxImpl;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class DeduplicationDisabledBrokerLevelTest extends ProducerConsumerBase {
+
+    private int deduplicationSnapshotFrequency = 5;
+    private int brokerDeduplicationEntriesInterval = 1000;
+
+    @BeforeClass
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        producerBaseSetup();
+    }
+
+    @AfterClass(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    protected void doInitConf() throws Exception {
+        this.conf.setBrokerDeduplicationEnabled(false);
+        this.conf.setBrokerDeduplicationSnapshotFrequencyInSeconds(deduplicationSnapshotFrequency);
+        this.conf.setBrokerDeduplicationEntriesInterval(brokerDeduplicationEntriesInterval);
+    }
+
+    @Test
+    public void testNoBacklogOnDeduplication() throws Exception {
+        final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+        admin.topics().createNonPartitionedTopic(topic);
+        final PersistentTopic persistentTopic =
+                (PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).join().get();
+        final ManagedLedgerImpl ml = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
+        // deduplication enabled:
+        //   broker level: "false"
+        //   topic level: "true".
+        // So it is enabled.
+        admin.topicPolicies().setDeduplicationStatus(topic, true);
+        Awaitility.await().untilAsserted(() -> {
+            ManagedCursorImpl cursor =
+                    (ManagedCursorImpl) ml.getCursors().get(PersistentTopic.DEDUPLICATION_CURSOR_NAME);
+            assertNotNull(cursor);
+        });
+
+        // Verify: regarding deduplication cursor, messages will be acknowledged automatically.
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();
+        producer.send("1");
+        producer.send("2");
+        producer.send("3");
+        producer.close();
+        ManagedCursorImpl cursor = (ManagedCursorImpl) ml.getCursors().get(PersistentTopic.DEDUPLICATION_CURSOR_NAME);
+        Awaitility.await().atMost(Duration.ofSeconds(deduplicationSnapshotFrequency * 3)).untilAsserted(() -> {
+            PositionImpl LAC = (PositionImpl) ml.getLastConfirmedEntry();
+            PositionImpl cursorMD = (PositionImpl) cursor.getMarkDeletedPosition();
+            assertTrue(LAC.compareTo(cursorMD) <= 0);
+        });
+
+        // cleanup.
+        admin.topics().delete(topic);
+    }
+
+    @Test
+    public void testSnapshotCounterAfterUnload() throws Exception {
+        final int originalDeduplicationSnapshotFrequency = deduplicationSnapshotFrequency;
+        deduplicationSnapshotFrequency = 3600;
+        cleanup();
+        setup();
+
+        // Create a topic and wait deduplication is started.
+        final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+        admin.topics().createNonPartitionedTopic(topic);
+        final PersistentTopic persistentTopic1 =
+                (PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).join().get();
+        final ManagedLedgerImpl ml1 = (ManagedLedgerImpl) persistentTopic1.getManagedLedger();
+        admin.topicPolicies().setDeduplicationStatus(topic, true);
+        Awaitility.await().untilAsserted(() -> {
+            ManagedCursorImpl cursor1 =
+                    (ManagedCursorImpl) ml1.getCursors().get(PersistentTopic.DEDUPLICATION_CURSOR_NAME);
+            assertNotNull(cursor1);
+        });
+        final MessageDeduplication deduplication1 = persistentTopic1.getMessageDeduplication();
+
+        // 1. Send 999 messages, it is less than "brokerDeduplicationEntriesIntervaddl".
+        // 2. Unload topic.
+        // 3. Send 1 messages, there are 1099 messages have not been snapshot now.
+        // 4. Verify the snapshot has been taken.
+        // step 1.
+        final Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();
+        for (int i = 0; i < brokerDeduplicationEntriesInterval - 1; i++) {
+            producer.send(i + "");
+        }
+        int snapshotCounter1 = WhiteboxImpl.getInternalState(deduplication1, "snapshotCounter");
+        assertEquals(snapshotCounter1, brokerDeduplicationEntriesInterval - 1);
+        admin.topics().unload(topic);
+        PersistentTopic persistentTopic2 =
+                (PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).join().get();
+        ManagedLedgerImpl ml2 = (ManagedLedgerImpl) persistentTopic2.getManagedLedger();
+        MessageDeduplication deduplication2 = persistentTopic2.getMessageDeduplication();
+        admin.topicPolicies().setDeduplicationStatus(topic, true);
+        Awaitility.await().untilAsserted(() -> {
+            ManagedCursorImpl cursor =
+                    (ManagedCursorImpl) ml2.getCursors().get(PersistentTopic.DEDUPLICATION_CURSOR_NAME);
+            assertNotNull(cursor);
+        });
+        // step 3.
+        producer.send("last message");
+        ml2.trimConsumedLedgersInBackground(new CompletableFuture<>());
+        // step 4.
+        Awaitility.await().untilAsserted(() -> {
+            int snapshotCounter3 = WhiteboxImpl.getInternalState(deduplication2, "snapshotCounter");
+            assertTrue(snapshotCounter3 < brokerDeduplicationEntriesInterval);
+            // Verify: the previous ledger will be removed because all messages have been acked.
+            assertEquals(ml2.getLedgersInfo().size(), 1);
+        });
+
+        // cleanup.
+        producer.close();
+        admin.topics().delete(topic);
+        deduplicationSnapshotFrequency = originalDeduplicationSnapshotFrequency;
+        cleanup();
+        setup();
+    }
+}