You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/02/18 15:57:14 UTC

[pulsar] 20/27: [Broker] Fix race condition in BrokerService topic cache (#9565)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 61d800ea3c4195abcb6aa0d30f1763263a63d84c
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Fri Feb 12 23:08:29 2021 +0200

    [Broker] Fix race condition in BrokerService topic cache (#9565)
    
    * Fix race condition in BrokerService topic cache
    
    * Add test that reproduces the topic cache race condition
    
    * Use logger for logging exception
    
    * Address review comments
    
    - remove timeout
    - show thread info in log statement
      - add index of thread as part of thread's name
    
    * Reset state before running BrokerServiceTests that check stats
    
    (cherry picked from commit cee6377de13180805a122842816291b169e1aea2)
---
 .../pulsar/broker/service/BrokerService.java       | 19 ++++++-
 .../pulsar/broker/service/BrokerServiceTest.java   | 65 ++++++++++++++++++----
 2 files changed, 72 insertions(+), 12 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 72d6b50..f2c9e4d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -756,7 +756,24 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
                     // Exceptional topics should be recreated.
                     topics.remove(topic, topicFuture);
                 } else {
-                    return topicFuture;
+                    // a non-existing topic in the cache shouldn't prevent creating a topic
+                    if (createIfMissing) {
+                        if (topicFuture.isDone() && topicFuture.getNow(Optional.empty()).isPresent()) {
+                            return topicFuture;
+                        } else {
+                            return topicFuture.thenCompose(value -> {
+                                if (!value.isPresent()) {
+                                    // retry and create topic
+                                    return getTopic(topic, createIfMissing);
+                                } else {
+                                    // in-progress future completed successfully
+                                    return CompletableFuture.completedFuture(value);
+                                }
+                            });
+                        }
+                    } else {
+                        return topicFuture;
+                    }
                 }
             }
             final boolean isPersistentTopic = TopicName.get(topic).getDomain().equals(TopicDomain.persistent);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index 9cbc624..0af9b1a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -24,11 +24,14 @@ import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.spy;
 import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
-
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStream;
@@ -49,7 +52,8 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
-
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
@@ -75,23 +79,17 @@ import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.BundlesData;
 import org.apache.pulsar.common.policies.data.LocalPolicies;
-import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.common.policies.data.SubscriptionStats;
+import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.common.util.SimpleTextOutputStream;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import com.google.gson.JsonArray;
-import com.google.gson.JsonObject;
-
-import lombok.Cleanup;
-
 /**
  */
+@Slf4j
 public class BrokerServiceTest extends BrokerTestBase {
 
     private final String TLS_SERVER_CERT_FILE_PATH = "./src/test/resources/certificate/server.crt";
@@ -111,6 +109,13 @@ public class BrokerServiceTest extends BrokerTestBase {
         super.internalCleanup();
     }
 
+    // method for resetting state explicitly
+    // this is required since setup & cleanup are using BeforeClass & AfterClass
+    private void resetState() throws Exception {
+        cleanup();
+        setup();
+    }
+
     @Test
     public void testOwnedNsCheck() throws Exception {
         final String topic = "persistent://prop/ns-abc/successTopic";
@@ -147,6 +152,9 @@ public class BrokerServiceTest extends BrokerTestBase {
 
     @Test
     public void testBrokerServicePersistentTopicStats() throws Exception {
+        // this test might fail if there are stats from other tests
+        resetState();
+
         final String topicName = "persistent://prop/ns-abc/successTopic";
         final String subName = "successSub";
 
@@ -245,6 +253,9 @@ public class BrokerServiceTest extends BrokerTestBase {
 
     @Test
     public void testBrokerServicePersistentRedeliverTopicStats() throws Exception {
+        // this test might fail if there are stats from other tests
+        resetState();
+
         final String topicName = "persistent://prop/ns-abc/successSharedTopic";
         final String subName = "successSharedSub";
 
@@ -382,6 +393,9 @@ public class BrokerServiceTest extends BrokerTestBase {
 
     @Test
     public void testBrokerServiceNamespaceStats() throws Exception {
+        // this test fails if there is state from other tests
+        resetState();
+
         final int numBundles = 4;
         final String ns1 = "prop/stats1";
         final String ns2 = "prop/stats2";
@@ -1001,4 +1015,33 @@ public class BrokerServiceTest extends BrokerTestBase {
         }
         Assert.assertTrue(sb.toString().contains("test_metrics"));
     }
+
+    @Test
+    public void shouldNotPreventCreatingTopicWhenNonexistingTopicIsCached() throws Exception {
+        // run multiple iterations to increase the chance of reproducing a race condition in the topic cache
+        for (int i = 0; i < 100; i++) {
+            final String topicName = "persistent://prop/ns-abc/topic-caching-test-topic" + i;
+            CountDownLatch latch = new CountDownLatch(1);
+            Thread getStatsThread = new Thread(() -> {
+                try {
+                    latch.countDown();
+                    // create race condition with a short delay
+                    // the bug might not reproduce in all environments, this works at least on i7-10750H CPU
+                    Thread.sleep(1);
+                    admin.topics().getStats(topicName);
+                    fail("The topic should not exist yet.");
+                } catch (PulsarAdminException.NotFoundException e) {
+                    // expected exception
+                } catch (PulsarAdminException | InterruptedException e) {
+                    log.error("Exception in {}", Thread.currentThread().getName(), e);
+                }
+            }, "getStatsThread#" + i);
+            getStatsThread.start();
+            latch.await();
+            @Cleanup
+            Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
+            assertNotNull(producer);
+            getStatsThread.join();
+        }
+    }
 }