You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/06/27 12:30:48 UTC

[pulsar] branch master updated: Fix race condition of the SystemTopicBasedTopicPoliciesService (#11097)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 81063c0  Fix race condition of the SystemTopicBasedTopicPoliciesService (#11097)
81063c0 is described below

commit 81063c04870ba7fa26222e57e4d4e94145e0a1e0
Author: lipenghui <pe...@apache.org>
AuthorDate: Sun Jun 27 20:30:15 2021 +0800

    Fix race condition of the SystemTopicBasedTopicPoliciesService (#11097)
    
    ### Motivation
    
    Currently, we are triggering the reader to read more messages not waiting for the init policies cache to complete,
    This might lead to the init process got hasMessages=true but not able to read the message since the message has been
    consumed by the read more entries process will lead to the `topic policy cache not init` exception.
    
    Here are the details of the race condition:
    
    https://github.com/apache/pulsar/blob/0b67438d23bbbc46b500e896a18aad715a514fd9/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java#L190
    
    https://github.com/apache/pulsar/blob/0b67438d23bbbc46b500e896a18aad715a514fd9/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java#L248
    
    https://github.com/apache/pulsar/blob/0b67438d23bbbc46b500e896a18aad715a514fd9/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java#L275
---
 .../pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java  | 5 +++--
 .../test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java  | 5 +++--
 2 files changed, 6 insertions(+), 4 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 87ff3b8..e2d2e74 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -188,7 +188,7 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
                         result.completeExceptionally(ex);
                     } else {
                         initPolicesCache(reader, result);
-                        readMorePolicies(reader);
+                        result.thenRun(() -> readMorePolicies(reader));
                     }
                 });
             }
@@ -254,6 +254,7 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
                         readerCaches.remove(reader.getSystemTopic().getTopicName().getNamespaceObject());
                     }
                     refreshTopicPoliciesCache(msg);
+                    notifyListener(msg);
                     if (log.isDebugEnabled()) {
                         log.debug("[{}] Loop next event reading for system topic.",
                                 reader.getSystemTopic().getTopicName().getNamespaceObject());
@@ -264,9 +265,9 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
                 if (log.isDebugEnabled()) {
                     log.debug("[{}] Reach the end of the system topic.", reader.getSystemTopic().getTopicName());
                 }
-                future.complete(null);
                 policyCacheInitMap.computeIfPresent(
                         reader.getSystemTopic().getTopicName().getNamespaceObject(), (k, v) -> true);
+                future.complete(null);
             }
         });
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index f66f464..ecd53b2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -947,8 +947,9 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest {
                 new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions,200,false);
         admin.topics().setInactiveTopicPolicies(topic, inactiveTopicPolicies);
 
+        InactiveTopicPolicies finalInactiveTopicPolicies = inactiveTopicPolicies;
         Awaitility.await()
-                .untilAsserted(() -> Assert.assertNotNull(admin.topics().getInactiveTopicPolicies(topic)));
+                .untilAsserted(() -> Assert.assertEquals(admin.topics().getInactiveTopicPolicies(topic), finalInactiveTopicPolicies));
 
         // restart broker, policy should still take effect
         restartBroker();
@@ -956,7 +957,7 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest {
         // Trigger the cache init.
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create();
 
-        InactiveTopicPolicies finalInactiveTopicPolicies = inactiveTopicPolicies;
+
         Awaitility.await()
                 .untilAsserted(() -> {
                     PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topic).get().get();