You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/06/28 15:13:16 UTC
[pulsar] 11/29: [fix][broker]Fix topic policies update not check message expiry (#15941)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 51c1985356fbb50e647f135c8addcebcab962f98
Author: Xiaoyu Hou <An...@gmail.com>
AuthorDate: Wed Jun 15 14:35:03 2022 +0800
[fix][broker]Fix topic policies update not check message expiry (#15941)
(cherry picked from commit cb0cffd6a03799dbbffa54813ebaddba0535787e)
---
.../broker/service/persistent/PersistentTopic.java | 2 +-
.../pulsar/broker/service/MessageTTLTest.java | 34 ++++++++++++++++++++--
2 files changed, 33 insertions(+), 3 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index a82bdb8d286..ad7903ba8d8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -3070,7 +3070,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
subscribeRateLimiter.onSubscribeRateUpdate(policies.getSubscribeRate()));
}
replicators.forEach((name, replicator) -> replicator.updateRateLimiter());
-
+ checkMessageExpiry();
if (policies.getReplicationClusters() != null) {
checkReplicationAndRetryOnFailure();
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageTTLTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageTTLTest.java
index e05ec328b41..31556197486 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageTTLTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageTTLTest.java
@@ -18,19 +18,26 @@
*/
package org.apache.pulsar.broker.service;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
import com.google.common.collect.Lists;
-
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import lombok.Cleanup;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats.CursorStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
+import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.testng.Assert.assertEquals;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -102,4 +109,27 @@ public class MessageTTLTest extends BrokerTestBase {
}
+ @Test
+ public void testTTLPoliciesUpdate() throws Exception {
+ final String namespace = "prop/ns-abc";
+ final String topicName = "persistent://" + namespace + "/testTTLPoliciesUpdate";
+
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
+ PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
+ assertNotNull(topicRef);
+
+ PersistentTopic topicRefMock = spy(topicRef);
+
+ // Namespace polices must be initiated from admin, which contains `replication_clusters`
+ Policies policies = admin.namespaces().getPolicies(namespace);
+ policies.message_ttl_in_seconds = 10;
+ topicRefMock.onPoliciesUpdate(policies);
+ verify(topicRefMock, times(1)).checkMessageExpiry();
+
+ TopicPolicies topicPolicies = new TopicPolicies();
+ topicPolicies.setMessageTTLInSeconds(5);
+ topicRefMock.onUpdate(topicPolicies);
+ verify(topicRefMock, times(2)).checkMessageExpiry();
+ }
}