You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/06/28 15:13:16 UTC

[pulsar] 11/29: [fix][broker]Fix topic policies update not check message expiry (#15941)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 51c1985356fbb50e647f135c8addcebcab962f98
Author: Xiaoyu Hou <An...@gmail.com>
AuthorDate: Wed Jun 15 14:35:03 2022 +0800

    [fix][broker]Fix topic policies update not check message expiry (#15941)
    
    (cherry picked from commit cb0cffd6a03799dbbffa54813ebaddba0535787e)
---
 .../broker/service/persistent/PersistentTopic.java |  2 +-
 .../pulsar/broker/service/MessageTTLTest.java      | 34 ++++++++++++++++++++--
 2 files changed, 33 insertions(+), 3 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index a82bdb8d286..ad7903ba8d8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -3070,7 +3070,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
                         subscribeRateLimiter.onSubscribeRateUpdate(policies.getSubscribeRate()));
             }
             replicators.forEach((name, replicator) -> replicator.updateRateLimiter());
-
+            checkMessageExpiry();
             if (policies.getReplicationClusters() != null) {
                 checkReplicationAndRetryOnFailure();
             }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageTTLTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageTTLTest.java
index e05ec328b41..31556197486 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageTTLTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageTTLTest.java
@@ -18,19 +18,26 @@
  */
 package org.apache.pulsar.broker.service;
 
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
 import com.google.common.collect.Lists;
-
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import lombok.Cleanup;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats.CursorStats;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
+import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import static org.testng.Assert.assertEquals;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -102,4 +109,27 @@ public class MessageTTLTest extends BrokerTestBase {
 
     }
 
+    @Test
+    public void testTTLPoliciesUpdate() throws Exception {
+        final String namespace = "prop/ns-abc";
+        final String topicName = "persistent://" + namespace + "/testTTLPoliciesUpdate";
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
+        PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
+        assertNotNull(topicRef);
+
+        PersistentTopic topicRefMock = spy(topicRef);
+
+        // Namespace polices must be initiated from admin, which contains `replication_clusters`
+        Policies policies = admin.namespaces().getPolicies(namespace);
+        policies.message_ttl_in_seconds = 10;
+        topicRefMock.onPoliciesUpdate(policies);
+        verify(topicRefMock, times(1)).checkMessageExpiry();
+
+        TopicPolicies topicPolicies = new TopicPolicies();
+        topicPolicies.setMessageTTLInSeconds(5);
+        topicRefMock.onUpdate(topicPolicies);
+        verify(topicRefMock, times(2)).checkMessageExpiry();
+    }
 }