You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/06/27 12:30:48 UTC
[pulsar] branch master updated: Fix race condition of the
SystemTopicBasedTopicPoliciesService (#11097)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 81063c0 Fix race condition of the SystemTopicBasedTopicPoliciesService (#11097)
81063c0 is described below
commit 81063c04870ba7fa26222e57e4d4e94145e0a1e0
Author: lipenghui <pe...@apache.org>
AuthorDate: Sun Jun 27 20:30:15 2021 +0800
Fix race condition of the SystemTopicBasedTopicPoliciesService (#11097)
### Motivation
Currently, we are triggering the reader to read more messages not waiting for the init policies cache to complete,
This might lead to the init process got hasMessages=true but not able to read the message since the message has been
consumed by the read more entries process will lead to the `topic policy cache not init` exception.
Here are the details of the race condition:
https://github.com/apache/pulsar/blob/0b67438d23bbbc46b500e896a18aad715a514fd9/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java#L190
https://github.com/apache/pulsar/blob/0b67438d23bbbc46b500e896a18aad715a514fd9/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java#L248
https://github.com/apache/pulsar/blob/0b67438d23bbbc46b500e896a18aad715a514fd9/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java#L275
---
.../pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java | 5 +++--
.../test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java | 5 +++--
2 files changed, 6 insertions(+), 4 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 87ff3b8..e2d2e74 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -188,7 +188,7 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
result.completeExceptionally(ex);
} else {
initPolicesCache(reader, result);
- readMorePolicies(reader);
+ result.thenRun(() -> readMorePolicies(reader));
}
});
}
@@ -254,6 +254,7 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
readerCaches.remove(reader.getSystemTopic().getTopicName().getNamespaceObject());
}
refreshTopicPoliciesCache(msg);
+ notifyListener(msg);
if (log.isDebugEnabled()) {
log.debug("[{}] Loop next event reading for system topic.",
reader.getSystemTopic().getTopicName().getNamespaceObject());
@@ -264,9 +265,9 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
if (log.isDebugEnabled()) {
log.debug("[{}] Reach the end of the system topic.", reader.getSystemTopic().getTopicName());
}
- future.complete(null);
policyCacheInitMap.computeIfPresent(
reader.getSystemTopic().getTopicName().getNamespaceObject(), (k, v) -> true);
+ future.complete(null);
}
});
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index f66f464..ecd53b2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -947,8 +947,9 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest {
new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions,200,false);
admin.topics().setInactiveTopicPolicies(topic, inactiveTopicPolicies);
+ InactiveTopicPolicies finalInactiveTopicPolicies = inactiveTopicPolicies;
Awaitility.await()
- .untilAsserted(() -> Assert.assertNotNull(admin.topics().getInactiveTopicPolicies(topic)));
+ .untilAsserted(() -> Assert.assertEquals(admin.topics().getInactiveTopicPolicies(topic), finalInactiveTopicPolicies));
// restart broker, policy should still take effect
restartBroker();
@@ -956,7 +957,7 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest {
// Trigger the cache init.
Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create();
- InactiveTopicPolicies finalInactiveTopicPolicies = inactiveTopicPolicies;
+
Awaitility.await()
.untilAsserted(() -> {
PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topic).get().get();