You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/06/29 00:51:41 UTC
[pulsar] branch master updated: Fix replay topic policy message not
work (#11136)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e263e09 Fix replay topic policy message not work (#11136)
e263e09 is described below
commit e263e09b1fd6fbe99617edb29b966bed3f5d7c0e
Author: hangc0276 <ch...@apache.org>
AuthorDate: Tue Jun 29 08:51:06 2021 +0800
Fix replay topic policy message not work (#11136)
### Motivation
When set topic level retention policy for a topic and restart the broker, the topic level retention policy doesn't work.
The reason is when replay the __change_events topic message on `initPolicesCache` stage, it create a reader and read message from earliest and notify the message to update policy for each topic. On update topic policy, it will call getTopicPolicies method.
```Java
public TopicPolicies getTopicPolicies(TopicName topicName) throws TopicPoliciesCacheNotInitException {
if (policyCacheInitMap.containsKey(topicName.getNamespaceObject())
&& !policyCacheInitMap.get(topicName.getNamespaceObject())) {
throw new TopicPoliciesCacheNotInitException();
}
return policiesCache.get(topicName);
}
```
This method will check `policyCacheInitMap` whether init or not for specific namespace.
However, before replay all message completely, the `policyCacheInitMap` keep in not init stage.
Thus the `getTopicPolicies` will throw `TopicPoliciesCacheNotInitException` and the topic policy message will replay failed.
### Modification
1. replay all policy messages after `policyCacheInitMap` initiated.
2. add retention policy check test for broker restart check.
---
.../SystemTopicBasedTopicPoliciesService.java | 10 ++++++++-
.../pulsar/broker/admin/TopicPoliciesTest.java | 25 ++++++++++++++++++++--
2 files changed, 32 insertions(+), 3 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index e2d2e74..6a5d0d6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -254,7 +254,6 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
readerCaches.remove(reader.getSystemTopic().getTopicName().getNamespaceObject());
}
refreshTopicPoliciesCache(msg);
- notifyListener(msg);
if (log.isDebugEnabled()) {
log.debug("[{}] Loop next event reading for system topic.",
reader.getSystemTopic().getTopicName().getNamespaceObject());
@@ -267,6 +266,15 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
}
policyCacheInitMap.computeIfPresent(
reader.getSystemTopic().getTopicName().getNamespaceObject(), (k, v) -> true);
+
+ // replay policy message
+ policiesCache.forEach(((topicName, topicPolicies) -> {
+ if (listeners.get(topicName) != null) {
+ for (TopicPolicyListener<TopicPolicies> listener : listeners.get(topicName)) {
+ listener.onUpdate(topicPolicies);
+ }
+ }
+ }));
future.complete(null);
}
});
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index ecd53b2..3934d3b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -935,6 +935,7 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest {
Awaitility.await()
.until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
+ // set namespace level inactive topic policies
InactiveTopicPolicies inactiveTopicPolicies =
new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_subscriptions_caught_up,100,true);
admin.namespaces().setInactiveTopicPolicies(myNamespace, inactiveTopicPolicies);
@@ -943,13 +944,28 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest {
.untilAsserted(() -> Assert.assertEquals(admin.namespaces().getInactiveTopicPolicies(myNamespace).getInactiveTopicDeleteMode(),
InactiveTopicDeleteMode.delete_when_subscriptions_caught_up));
+ // set namespace retention policies
+ final RetentionPolicies retentionPolicies = new RetentionPolicies(10, -1);
+ admin.namespaces().setRetention(myNamespace, retentionPolicies);
+ Awaitility.await()
+ .untilAsserted(() -> Assert.assertEquals(admin.namespaces().getRetention(myNamespace),
+ retentionPolicies));
+
+ // set topic level inactive topic policies
inactiveTopicPolicies =
new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions,200,false);
admin.topics().setInactiveTopicPolicies(topic, inactiveTopicPolicies);
InactiveTopicPolicies finalInactiveTopicPolicies = inactiveTopicPolicies;
Awaitility.await()
- .untilAsserted(() -> Assert.assertEquals(admin.topics().getInactiveTopicPolicies(topic), finalInactiveTopicPolicies));
+ .untilAsserted(() -> Assert.assertEquals(admin.topics().getInactiveTopicPolicies(topic),
+ finalInactiveTopicPolicies));
+
+ // set topic level retention policies
+ final RetentionPolicies finalRetentionPolicies = new RetentionPolicies(20, -1);
+ admin.topics().setRetention(topic, finalRetentionPolicies);
+ Awaitility.await()
+ .untilAsserted(() -> Assert.assertEquals(admin.topics().getRetention(topic), finalRetentionPolicies));
// restart broker, policy should still take effect
restartBroker();
@@ -957,11 +973,16 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest {
// Trigger the cache init.
Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create();
-
+ // check inactive topic policies and retention policies.
Awaitility.await()
.untilAsserted(() -> {
PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topic).get().get();
+ ManagedLedgerConfig managedLedgerConfig = persistentTopic.getManagedLedger().getConfig();
Assert.assertEquals(persistentTopic.getInactiveTopicPolicies(), finalInactiveTopicPolicies);
+ Assert.assertEquals(managedLedgerConfig.getRetentionSizeInMB(),
+ finalRetentionPolicies.getRetentionSizeInMB());
+ Assert.assertEquals(managedLedgerConfig.getRetentionTimeMillis(),
+ TimeUnit.MINUTES.toMillis(finalRetentionPolicies.getRetentionTimeInMinutes()));
});
producer.close();