You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/02/18 15:57:14 UTC
[pulsar] 20/27: [Broker] Fix race condition in BrokerService topic
cache (#9565)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 61d800ea3c4195abcb6aa0d30f1763263a63d84c
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Fri Feb 12 23:08:29 2021 +0200
[Broker] Fix race condition in BrokerService topic cache (#9565)
* Fix race condition in BrokerService topic cache
* Add test that reproduces the topic cache race condition
* Use logger for logging exception
* Address review comments
- remove timeout
- show thread info in log statement
- add index of thread as part of thread's name
* Reset state before running BrokerServiceTests that check stats
(cherry picked from commit cee6377de13180805a122842816291b169e1aea2)
---
.../pulsar/broker/service/BrokerService.java | 19 ++++++-
.../pulsar/broker/service/BrokerServiceTest.java | 65 ++++++++++++++++++----
2 files changed, 72 insertions(+), 12 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 72d6b50..f2c9e4d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -756,7 +756,24 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
// Exceptional topics should be recreated.
topics.remove(topic, topicFuture);
} else {
- return topicFuture;
+ // a non-existing topic in the cache shouldn't prevent creating a topic
+ if (createIfMissing) {
+ if (topicFuture.isDone() && topicFuture.getNow(Optional.empty()).isPresent()) {
+ return topicFuture;
+ } else {
+ return topicFuture.thenCompose(value -> {
+ if (!value.isPresent()) {
+ // retry and create topic
+ return getTopic(topic, createIfMissing);
+ } else {
+ // in-progress future completed successfully
+ return CompletableFuture.completedFuture(value);
+ }
+ });
+ }
+ } else {
+ return topicFuture;
+ }
}
}
final boolean isPersistentTopic = TopicName.get(topic).getDomain().equals(TopicDomain.persistent);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index 9cbc624..0af9b1a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -24,11 +24,14 @@ import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
-
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
@@ -49,7 +52,8 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
-
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
@@ -75,23 +79,17 @@ import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.LocalPolicies;
-import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
+import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.util.SimpleTextOutputStream;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import com.google.gson.JsonArray;
-import com.google.gson.JsonObject;
-
-import lombok.Cleanup;
-
/**
*/
+@Slf4j
public class BrokerServiceTest extends BrokerTestBase {
private final String TLS_SERVER_CERT_FILE_PATH = "./src/test/resources/certificate/server.crt";
@@ -111,6 +109,13 @@ public class BrokerServiceTest extends BrokerTestBase {
super.internalCleanup();
}
+ // method for resetting state explicitly
+ // this is required since setup & cleanup are using BeforeClass & AfterClass
+ private void resetState() throws Exception {
+ cleanup();
+ setup();
+ }
+
@Test
public void testOwnedNsCheck() throws Exception {
final String topic = "persistent://prop/ns-abc/successTopic";
@@ -147,6 +152,9 @@ public class BrokerServiceTest extends BrokerTestBase {
@Test
public void testBrokerServicePersistentTopicStats() throws Exception {
+ // this test might fail if there are stats from other tests
+ resetState();
+
final String topicName = "persistent://prop/ns-abc/successTopic";
final String subName = "successSub";
@@ -245,6 +253,9 @@ public class BrokerServiceTest extends BrokerTestBase {
@Test
public void testBrokerServicePersistentRedeliverTopicStats() throws Exception {
+ // this test might fail if there are stats from other tests
+ resetState();
+
final String topicName = "persistent://prop/ns-abc/successSharedTopic";
final String subName = "successSharedSub";
@@ -382,6 +393,9 @@ public class BrokerServiceTest extends BrokerTestBase {
@Test
public void testBrokerServiceNamespaceStats() throws Exception {
+ // this test fails if there is state from other tests
+ resetState();
+
final int numBundles = 4;
final String ns1 = "prop/stats1";
final String ns2 = "prop/stats2";
@@ -1001,4 +1015,33 @@ public class BrokerServiceTest extends BrokerTestBase {
}
Assert.assertTrue(sb.toString().contains("test_metrics"));
}
+
+ @Test
+ public void shouldNotPreventCreatingTopicWhenNonexistingTopicIsCached() throws Exception {
+ // run multiple iterations to increase the chance of reproducing a race condition in the topic cache
+ for (int i = 0; i < 100; i++) {
+ final String topicName = "persistent://prop/ns-abc/topic-caching-test-topic" + i;
+ CountDownLatch latch = new CountDownLatch(1);
+ Thread getStatsThread = new Thread(() -> {
+ try {
+ latch.countDown();
+ // create race condition with a short delay
+ // the bug might not reproduce in all environments, this works at least on i7-10750H CPU
+ Thread.sleep(1);
+ admin.topics().getStats(topicName);
+ fail("The topic should not exist yet.");
+ } catch (PulsarAdminException.NotFoundException e) {
+ // expected exception
+ } catch (PulsarAdminException | InterruptedException e) {
+ log.error("Exception in {}", Thread.currentThread().getName(), e);
+ }
+ }, "getStatsThread#" + i);
+ getStatsThread.start();
+ latch.await();
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
+ assertNotNull(producer);
+ getStatsThread.join();
+ }
+ }
}