You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rg...@apache.org on 2024/03/04 10:25:58 UTC

(pulsar) 03/05: [fix][broker]Support setting `autoSkipNonRecoverableData` dynamically in expiryMon… (#21991)

This is an automated email from the ASF dual-hosted git repository.

rgao pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 09cb54148c1db3b3f18dd8991c59b788c975d6b3
Author: atomchen <49...@qq.com>
AuthorDate: Sun Feb 18 15:51:49 2024 +0800

    [fix][broker]Support setting `autoSkipNonRecoverableData` dynamically in expiryMon… (#21991)
    
    Co-authored-by: atomchchen <at...@tencent.com>
    (cherry picked from commit 220a3d601602d67f5f44516c5d9895dfaa270380)
---
 .../persistent/PersistentMessageExpiryMonitor.java | 10 ++++++---
 .../service/persistent/PersistentTopicTest.java    | 26 ++++++++++++++++++++++
 2 files changed, 33 insertions(+), 3 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
index ea45e9536a7..ac391c10503 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.service.persistent;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.SortedMap;
@@ -51,7 +52,6 @@ public class PersistentMessageExpiryMonitor implements FindEntryCallback, Messag
     private final String topicName;
     private final Rate msgExpired;
     private final LongAdder totalMsgExpired;
-    private final boolean autoSkipNonRecoverableData;
     private final PersistentSubscription subscription;
 
     private static final int FALSE = 0;
@@ -71,8 +71,12 @@ public class PersistentMessageExpiryMonitor implements FindEntryCallback, Messag
         this.subscription = subscription;
         this.msgExpired = new Rate();
         this.totalMsgExpired = new LongAdder();
+    }
+
+    @VisibleForTesting
+    public boolean isAutoSkipNonRecoverableData() {
         // check to avoid test failures
-        this.autoSkipNonRecoverableData = this.cursor.getManagedLedger() != null
+        return this.cursor.getManagedLedger() != null
                 && this.cursor.getManagedLedger().getConfig().isAutoSkipNonRecoverableData();
     }
 
@@ -230,7 +234,7 @@ public class PersistentMessageExpiryMonitor implements FindEntryCallback, Messag
         if (log.isDebugEnabled()) {
             log.debug("[{}][{}] Finding expired entry operation failed", topicName, subName, exception);
         }
-        if (autoSkipNonRecoverableData && failedReadPosition.isPresent()
+        if (isAutoSkipNonRecoverableData() && failedReadPosition.isPresent()
                 && (exception instanceof NonRecoverableLedgerException)) {
             log.warn("[{}][{}] read failed from ledger at position:{} : {}", topicName, subName, failedReadPosition,
                     exception.getMessage());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
index fe84aeb1df7..717dfc28ac8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
@@ -635,4 +635,30 @@ public class PersistentTopicTest extends BrokerTestBase {
         assertEquals(persistentTopic.getManagedLedger().getConfig().getRetentionSizeInMB(), 1L);
         assertEquals(persistentTopic.getManagedLedger().getConfig().getRetentionTimeMillis(), TimeUnit.MINUTES.toMillis(1));
     }
+
+    @Test
+    public void testDynamicConfigurationAutoSkipNonRecoverableData() throws Exception {
+        pulsar.getConfiguration().setAutoSkipNonRecoverableData(false);
+        final String topicName = "persistent://prop/ns-abc/testAutoSkipNonRecoverableData";
+        final String subName = "test_sub";
+
+        Consumer<byte[]> subscribe = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).subscribe();
+        PersistentTopic persistentTopic =
+                (PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false).join().get();
+        PersistentSubscription subscription = persistentTopic.getSubscription(subName);
+
+        assertFalse(persistentTopic.ledger.getConfig().isAutoSkipNonRecoverableData());
+        assertFalse(subscription.getExpiryMonitor().isAutoSkipNonRecoverableData());
+
+        String key = "autoSkipNonRecoverableData";
+        admin.brokers().updateDynamicConfiguration(key, "true");
+        Awaitility.await()
+                .untilAsserted(() -> assertEquals(admin.brokers().getAllDynamicConfigurations().get(key), "true"));
+
+        assertTrue(persistentTopic.ledger.getConfig().isAutoSkipNonRecoverableData());
+        assertTrue(subscription.getExpiryMonitor().isAutoSkipNonRecoverableData());
+
+        subscribe.close();
+        admin.topics().delete(topicName);
+    }
 }