You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/06/25 04:24:31 UTC
[pulsar] 01/09: [pulsar-broker] Handle multiple topic creation for
same topic-name in broker (#10847)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 887f8ec398fad0062fa04b945099afbb888b929e
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Thu Jun 17 23:07:34 2021 -0700
[pulsar-broker] Handle multiple topic creation for same topic-name in broker (#10847)
### Motivation
When the broker takes a longer time to load the topic and times out before completing the topic future, then the broker keeps multiple topics opened , doesn't clean up timed out topic, fail to create replicator producer on successfully created topic with error: `repl-producer is already connected to topic`, builds replication backlog.
```
19:16:10.107 [pulsar-ordered-OrderedExecutor-5-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - Opening managed ledger myProp/global/myNs/persistent/myTopic
:
9:17:10.953 [BookKeeperClientWorker-OrderedExecutor-2-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl - [myProp/global/myNs/persistent/myTopic] Successfully initialize managed ledger
:
19:17:10.065 [pulsar-io-23-30] ERROR org.apache.pulsar.broker.service.ServerCnx - [/10.196.133.62:47278] Failed to create topic persistent://myProp/global/myNs/myTopic, producerId=382
:
19:17:10.954 [BookKeeperClientWorker-OrderedExecutor-2-0] INFO org.apache.pulsar.broker.service.persistent.PersistentReplicator - [persistent://myProp/global/myNs/myTopic][west1 -> west2] Starting open cursor for replicator
:
19:17:10.955 [BookKeeperClientWorker-OrderedExecutor-2-0] INFO org.apache.pulsar.broker.service.BrokerService - Created topic persistent://myProp/global/myNs/myTopic - dedup is disabled
:
19:17:51.532 [pulsar-ordered-OrderedExecutor-5-0] INFO org.apache.pulsar.broker.service.BrokerService - Created topic persistent://myProp/global/myNs/myTopic - dedup is disabled
:
19:17:51.530 [pulsar-ordered-OrderedExecutor-5-0] INFO org.apache.pulsar.broker.service.persistent.PersistentReplicator - [persistent://myProp/global/myNs/myTopic][west1 -> west2] Starting open cursor for replicator
:
07:25:51.377 [pulsar-io-23-5] ERROR org.apache.pulsar.client.impl.ProducerImpl - [persistent://myProp/global/myNs/myTopic] [pulsar.repl.west1] Failed to create producer: Producer with name 'pulsarrepl.west1' is already connected to topic
```
### Modification
- Stopped replicator for failed and timed-out topic
- Clean up failed topic
### Result
- Successfully create replicator producer for the topic and avoid creating replication backlog
(cherry picked from commit 1447e6b1061babedc08901c44f16164bb4c4e2df)
---
.../pulsar/broker/service/BrokerService.java | 14 +++-
.../pulsar/broker/service/ReplicatorTest.java | 80 ++++++++++++++++++++++
.../org/apache/pulsar/common/util/FutureUtil.java | 31 +++++++++
3 files changed, 122 insertions(+), 3 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 575738a..9c9d482 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -970,7 +970,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
}
private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String topic) {
- CompletableFuture<Optional<Topic>> topicFuture = futureWithDeadline();
+ CompletableFuture<Optional<Topic>> topicFuture = FutureUtil.futureWithDeadline(executor());
if (!pulsar.getConfiguration().isEnableNonPersistentTopics()) {
if (log.isDebugEnabled()) {
@@ -1233,8 +1233,16 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
long topicLoadLatencyMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime())
- topicCreateTimeMs;
pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs);
- addTopicToStatsMaps(topicName, persistentTopic);
- topicFuture.complete(Optional.of(persistentTopic));
+ if (topicFuture.isCompletedExceptionally()) {
+ log.warn("{} future is already completed with failure {}, closing the topic",
+ topic, FutureUtil.getException(topicFuture));
+ persistentTopic.stopReplProducers().whenComplete((v, exception) -> {
+ topics.remove(topic, topicFuture);
+ });
+ } else {
+ addTopicToStatsMaps(topicName, persistentTopic);
+ topicFuture.complete(Optional.of(persistentTopic));
+ }
}).exceptionally((ex) -> {
log.warn(
"Replication or dedup check failed."
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index e5219d5..d261e85 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -18,11 +18,13 @@
*/
package org.apache.pulsar.broker.service;
+import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
@@ -34,11 +36,13 @@ import io.netty.buffer.ByteBuf;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.List;
+import java.util.Optional;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -52,6 +56,7 @@ import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.CursorAlreadyClosedException;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
@@ -65,6 +70,7 @@ import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.RawMessage;
import org.apache.pulsar.client.api.RawReader;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
@@ -1041,6 +1047,80 @@ public class ReplicatorTest extends ReplicatorTestBase {
nonPersistentProducer2.close();
}
+ @Test
+ public void testCleanupTopic() throws Exception {
+
+ final String cluster1 = pulsar1.getConfig().getClusterName();
+ final String cluster2 = pulsar2.getConfig().getClusterName();
+ final String namespace = "pulsar/ns-" + System.nanoTime();
+ final String topicName = "persistent://" + namespace + "/cleanTopic";
+ final String topicMlName = namespace + "/persistent/cleanTopic";
+ admin1.namespaces().createNamespace(namespace, Sets.newHashSet(cluster1, cluster2));
+
+ PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS)
+ .build();
+
+ long topicLoadTimeoutSeconds = 3;
+ config1.setTopicLoadTimeoutSeconds(topicLoadTimeoutSeconds);
+ config2.setTopicLoadTimeoutSeconds(topicLoadTimeoutSeconds);
+
+ ManagedLedgerFactoryImpl mlFactory = (ManagedLedgerFactoryImpl) pulsar1.getManagedLedgerClientFactory()
+ .getManagedLedgerFactory();
+ Field ledgersField = ManagedLedgerFactoryImpl.class.getDeclaredField("ledgers");
+ ledgersField.setAccessible(true);
+ ConcurrentHashMap<String, CompletableFuture<ManagedLedgerImpl>> ledgers = (ConcurrentHashMap<String, CompletableFuture<ManagedLedgerImpl>>) ledgersField
+ .get(mlFactory);
+ CompletableFuture<ManagedLedgerImpl> mlFuture = new CompletableFuture<>();
+ ledgers.put(topicMlName, mlFuture);
+
+ try {
+ Consumer<byte[]> consumer = client1.newConsumer().topic(topicName).subscriptionType(SubscriptionType.Shared)
+ .subscriptionName("my-subscriber-name").subscribeAsync().get(100, TimeUnit.MILLISECONDS);
+ fail("consumer should fail due to topic loading failure");
+ } catch (Exception e) {
+ // Ok
+ }
+
+ CompletableFuture<Optional<Topic>> topicFuture = null;
+ for (int i = 0; i < 5; i++) {
+ topicFuture = pulsar1.getBrokerService().getTopics().get(topicName);
+ if (topicFuture != null) {
+ break;
+ }
+ Thread.sleep(i * 1000);
+ }
+
+ try {
+ topicFuture.get();
+ fail("topic creation should fail");
+ } catch (Exception e) {
+ // Ok
+ }
+
+ final CompletableFuture<Optional<Topic>> timedOutTopicFuture = topicFuture;
+ // timeout topic future should be removed from cache
+ retryStrategically((test) -> pulsar1.getBrokerService().getTopic(topicName, false) != timedOutTopicFuture, 5,
+ 1000);
+
+ assertNotEquals(timedOutTopicFuture, pulsar1.getBrokerService().getTopics().get(topicName));
+
+ try {
+ Consumer<byte[]> consumer = client1.newConsumer().topic(topicName).subscriptionType(SubscriptionType.Shared)
+ .subscriptionName("my-subscriber-name").subscribeAsync().get(100, TimeUnit.MILLISECONDS);
+ fail("consumer should fail due to topic loading failure");
+ } catch (Exception e) {
+ // Ok
+ }
+
+ ManagedLedgerImpl ml = (ManagedLedgerImpl) mlFactory.open(topicMlName + "-2");
+ mlFuture.complete(ml);
+
+ Consumer<byte[]> consumer = client1.newConsumer().topic(topicName).subscriptionName("my-subscriber-name")
+ .subscriptionType(SubscriptionType.Shared).subscribeAsync()
+ .get(2 * topicLoadTimeoutSeconds, TimeUnit.SECONDS);
+
+ consumer.close();
+ }
private static final Logger log = LoggerFactory.getLogger(ReplicatorTest.class);
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
index 7356950..0c3a0c0 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
@@ -20,8 +20,10 @@ package org.apache.pulsar.common.util;
import java.time.Duration;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@@ -162,4 +164,33 @@ public class FutureUtil {
return this;
}
}
+
+ public static <T> CompletableFuture<T> futureWithDeadline(ScheduledExecutorService executor, Long delay,
+ TimeUnit unit, Exception exp) {
+ CompletableFuture<T> future = new CompletableFuture<T>();
+ executor.schedule(() -> {
+ if (!future.isDone()) {
+ future.completeExceptionally(exp);
+ }
+ }, delay, unit);
+ return future;
+ }
+
+ public static <T> CompletableFuture<T> futureWithDeadline(ScheduledExecutorService executor) {
+ return futureWithDeadline(executor, 60000L, TimeUnit.MILLISECONDS,
+ new TimeoutException("Future didn't finish within deadline"));
+ }
+
+ public static <T> Optional<Throwable> getException(CompletableFuture<T> future) {
+ if (future != null && future.isCompletedExceptionally()) {
+ try {
+ future.get();
+ } catch (InterruptedException e) {
+ return Optional.ofNullable(e);
+ } catch (ExecutionException e) {
+ return Optional.ofNullable(e.getCause());
+ }
+ }
+ return Optional.empty();
+ }
}