You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/06/29 00:51:41 UTC

[pulsar] branch master updated: Fix replay topic policy message not work (#11136)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e263e09  Fix replay topic policy message not work (#11136)
e263e09 is described below

commit e263e09b1fd6fbe99617edb29b966bed3f5d7c0e
Author: hangc0276 <ch...@apache.org>
AuthorDate: Tue Jun 29 08:51:06 2021 +0800

    Fix replay topic policy message not work (#11136)
    
    ### Motivation
    When set topic level retention policy for a topic and restart the broker, the topic level retention policy doesn't work.
    The reason is when replay the __change_events topic message on `initPolicesCache` stage, it create a reader and read message from earliest and notify the message to update policy for each topic. On update topic policy, it will call getTopicPolicies method.
    ```Java
    public TopicPolicies getTopicPolicies(TopicName topicName) throws TopicPoliciesCacheNotInitException {
            if (policyCacheInitMap.containsKey(topicName.getNamespaceObject())
                    && !policyCacheInitMap.get(topicName.getNamespaceObject())) {
                throw new TopicPoliciesCacheNotInitException();
            }
            return policiesCache.get(topicName);
        }
    ```
    This method will check `policyCacheInitMap` whether init or not for specific namespace.
    However, before replay all message completely, the `policyCacheInitMap` keep in not init stage.
    Thus the `getTopicPolicies` will throw `TopicPoliciesCacheNotInitException` and the topic policy message will replay failed.
    
    ### Modification
    1. replay all policy messages after `policyCacheInitMap` initiated.
    2. add retention policy check test for broker restart check.
---
 .../SystemTopicBasedTopicPoliciesService.java      | 10 ++++++++-
 .../pulsar/broker/admin/TopicPoliciesTest.java     | 25 ++++++++++++++++++++--
 2 files changed, 32 insertions(+), 3 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index e2d2e74..6a5d0d6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -254,7 +254,6 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
                         readerCaches.remove(reader.getSystemTopic().getTopicName().getNamespaceObject());
                     }
                     refreshTopicPoliciesCache(msg);
-                    notifyListener(msg);
                     if (log.isDebugEnabled()) {
                         log.debug("[{}] Loop next event reading for system topic.",
                                 reader.getSystemTopic().getTopicName().getNamespaceObject());
@@ -267,6 +266,15 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
                 }
                 policyCacheInitMap.computeIfPresent(
                         reader.getSystemTopic().getTopicName().getNamespaceObject(), (k, v) -> true);
+
+                // replay policy message
+                policiesCache.forEach(((topicName, topicPolicies) -> {
+                    if (listeners.get(topicName) != null) {
+                        for (TopicPolicyListener<TopicPolicies> listener : listeners.get(topicName)) {
+                            listener.onUpdate(topicPolicies);
+                        }
+                    }
+                }));
                 future.complete(null);
             }
         });
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index ecd53b2..3934d3b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -935,6 +935,7 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest {
         Awaitility.await()
                 .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
 
+        // set namespace level inactive topic policies
         InactiveTopicPolicies inactiveTopicPolicies =
                 new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_subscriptions_caught_up,100,true);
         admin.namespaces().setInactiveTopicPolicies(myNamespace, inactiveTopicPolicies);
@@ -943,13 +944,28 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest {
                 .untilAsserted(() -> Assert.assertEquals(admin.namespaces().getInactiveTopicPolicies(myNamespace).getInactiveTopicDeleteMode(),
                         InactiveTopicDeleteMode.delete_when_subscriptions_caught_up));
 
+        // set namespace retention policies
+        final RetentionPolicies retentionPolicies = new RetentionPolicies(10, -1);
+        admin.namespaces().setRetention(myNamespace, retentionPolicies);
+        Awaitility.await()
+                .untilAsserted(() -> Assert.assertEquals(admin.namespaces().getRetention(myNamespace),
+                        retentionPolicies));
+
+        // set topic level inactive topic policies
         inactiveTopicPolicies =
                 new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions,200,false);
         admin.topics().setInactiveTopicPolicies(topic, inactiveTopicPolicies);
 
         InactiveTopicPolicies finalInactiveTopicPolicies = inactiveTopicPolicies;
         Awaitility.await()
-                .untilAsserted(() -> Assert.assertEquals(admin.topics().getInactiveTopicPolicies(topic), finalInactiveTopicPolicies));
+                .untilAsserted(() -> Assert.assertEquals(admin.topics().getInactiveTopicPolicies(topic),
+                        finalInactiveTopicPolicies));
+
+        // set topic level retention policies
+        final RetentionPolicies finalRetentionPolicies = new RetentionPolicies(20, -1);
+        admin.topics().setRetention(topic, finalRetentionPolicies);
+        Awaitility.await()
+                .untilAsserted(() -> Assert.assertEquals(admin.topics().getRetention(topic), finalRetentionPolicies));
 
         // restart broker, policy should still take effect
         restartBroker();
@@ -957,11 +973,16 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest {
         // Trigger the cache init.
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create();
 
-
+        // check inactive topic policies and retention policies.
         Awaitility.await()
                 .untilAsserted(() -> {
                     PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topic).get().get();
+                    ManagedLedgerConfig managedLedgerConfig = persistentTopic.getManagedLedger().getConfig();
                     Assert.assertEquals(persistentTopic.getInactiveTopicPolicies(), finalInactiveTopicPolicies);
+                    Assert.assertEquals(managedLedgerConfig.getRetentionSizeInMB(),
+                            finalRetentionPolicies.getRetentionSizeInMB());
+                    Assert.assertEquals(managedLedgerConfig.getRetentionTimeMillis(),
+                            TimeUnit.MINUTES.toMillis(finalRetentionPolicies.getRetentionTimeInMinutes()));
                 });
 
         producer.close();