You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rg...@apache.org on 2024/03/04 10:25:58 UTC
(pulsar) 03/05: [fix][broker]Support setting `autoSkipNonRecoverableData` dynamically in expiryMon… (#21991)
This is an automated email from the ASF dual-hosted git repository.
rgao pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 09cb54148c1db3b3f18dd8991c59b788c975d6b3
Author: atomchen <49...@qq.com>
AuthorDate: Sun Feb 18 15:51:49 2024 +0800
[fix][broker]Support setting `autoSkipNonRecoverableData` dynamically in expiryMon… (#21991)
Co-authored-by: atomchchen <at...@tencent.com>
(cherry picked from commit 220a3d601602d67f5f44516c5d9895dfaa270380)
---
.../persistent/PersistentMessageExpiryMonitor.java | 10 ++++++---
.../service/persistent/PersistentTopicTest.java | 26 ++++++++++++++++++++++
2 files changed, 33 insertions(+), 3 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
index ea45e9536a7..ac391c10503 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.service.persistent;
+import com.google.common.annotations.VisibleForTesting;
import java.util.Objects;
import java.util.Optional;
import java.util.SortedMap;
@@ -51,7 +52,6 @@ public class PersistentMessageExpiryMonitor implements FindEntryCallback, Messag
private final String topicName;
private final Rate msgExpired;
private final LongAdder totalMsgExpired;
- private final boolean autoSkipNonRecoverableData;
private final PersistentSubscription subscription;
private static final int FALSE = 0;
@@ -71,8 +71,12 @@ public class PersistentMessageExpiryMonitor implements FindEntryCallback, Messag
this.subscription = subscription;
this.msgExpired = new Rate();
this.totalMsgExpired = new LongAdder();
+ }
+
+ @VisibleForTesting
+ public boolean isAutoSkipNonRecoverableData() {
// check to avoid test failures
- this.autoSkipNonRecoverableData = this.cursor.getManagedLedger() != null
+ return this.cursor.getManagedLedger() != null
&& this.cursor.getManagedLedger().getConfig().isAutoSkipNonRecoverableData();
}
@@ -230,7 +234,7 @@ public class PersistentMessageExpiryMonitor implements FindEntryCallback, Messag
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Finding expired entry operation failed", topicName, subName, exception);
}
- if (autoSkipNonRecoverableData && failedReadPosition.isPresent()
+ if (isAutoSkipNonRecoverableData() && failedReadPosition.isPresent()
&& (exception instanceof NonRecoverableLedgerException)) {
log.warn("[{}][{}] read failed from ledger at position:{} : {}", topicName, subName, failedReadPosition,
exception.getMessage());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
index fe84aeb1df7..717dfc28ac8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
@@ -635,4 +635,30 @@ public class PersistentTopicTest extends BrokerTestBase {
assertEquals(persistentTopic.getManagedLedger().getConfig().getRetentionSizeInMB(), 1L);
assertEquals(persistentTopic.getManagedLedger().getConfig().getRetentionTimeMillis(), TimeUnit.MINUTES.toMillis(1));
}
+
+ @Test
+ public void testDynamicConfigurationAutoSkipNonRecoverableData() throws Exception {
+ pulsar.getConfiguration().setAutoSkipNonRecoverableData(false);
+ final String topicName = "persistent://prop/ns-abc/testAutoSkipNonRecoverableData";
+ final String subName = "test_sub";
+
+ Consumer<byte[]> subscribe = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).subscribe();
+ PersistentTopic persistentTopic =
+ (PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false).join().get();
+ PersistentSubscription subscription = persistentTopic.getSubscription(subName);
+
+ assertFalse(persistentTopic.ledger.getConfig().isAutoSkipNonRecoverableData());
+ assertFalse(subscription.getExpiryMonitor().isAutoSkipNonRecoverableData());
+
+ String key = "autoSkipNonRecoverableData";
+ admin.brokers().updateDynamicConfiguration(key, "true");
+ Awaitility.await()
+ .untilAsserted(() -> assertEquals(admin.brokers().getAllDynamicConfigurations().get(key), "true"));
+
+ assertTrue(persistentTopic.ledger.getConfig().isAutoSkipNonRecoverableData());
+ assertTrue(subscription.getExpiryMonitor().isAutoSkipNonRecoverableData());
+
+ subscribe.close();
+ admin.topics().delete(topicName);
+ }
}