You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/06/25 04:24:31 UTC

[pulsar] 01/09: [pulsar-broker] Handle multiple topic creation for same topic-name in broker (#10847)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 887f8ec398fad0062fa04b945099afbb888b929e
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Thu Jun 17 23:07:34 2021 -0700

    [pulsar-broker] Handle multiple topic creation for same topic-name in broker (#10847)
    
    ### Motivation
    
    When the broker takes a longer time to load the topic and times out before completing the topic future, then the broker keeps multiple topics opened , doesn't clean up timed out topic, fail to create replicator producer on successfully created topic with error: `repl-producer is already connected to topic`, builds replication backlog.
    
    ```
    19:16:10.107 [pulsar-ordered-OrderedExecutor-5-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - Opening managed ledger myProp/global/myNs/persistent/myTopic
    :
    9:17:10.953 [BookKeeperClientWorker-OrderedExecutor-2-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl - [myProp/global/myNs/persistent/myTopic] Successfully initialize managed ledger
    :
    19:17:10.065 [pulsar-io-23-30] ERROR org.apache.pulsar.broker.service.ServerCnx - [/10.196.133.62:47278] Failed to create topic persistent://myProp/global/myNs/myTopic, producerId=382
    :
    19:17:10.954 [BookKeeperClientWorker-OrderedExecutor-2-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentReplicator - [persistent://myProp/global/myNs/myTopic][west1 -> west2] Starting open cursor for replicator
    :
    19:17:10.955 [BookKeeperClientWorker-OrderedExecutor-2-0] INFO  org.apache.pulsar.broker.service.BrokerService - Created topic persistent://myProp/global/myNs/myTopic - dedup is disabled
    :
    19:17:51.532 [pulsar-ordered-OrderedExecutor-5-0] INFO  org.apache.pulsar.broker.service.BrokerService - Created topic persistent://myProp/global/myNs/myTopic - dedup is disabled
    :
    19:17:51.530 [pulsar-ordered-OrderedExecutor-5-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentReplicator - [persistent://myProp/global/myNs/myTopic][west1 -> west2] Starting open cursor for replicator
    :
    07:25:51.377 [pulsar-io-23-5] ERROR org.apache.pulsar.client.impl.ProducerImpl - [persistent://myProp/global/myNs/myTopic] [pulsar.repl.west1] Failed to create producer: Producer with name 'pulsarrepl.west1' is already connected to topic
    ```
    
    ### Modification
    - Stopped replicator for failed and timed-out topic
    - Clean up failed topic
    
    ### Result
    - Successfully create replicator producer for the topic and avoid creating replication backlog
    
    (cherry picked from commit 1447e6b1061babedc08901c44f16164bb4c4e2df)
---
 .../pulsar/broker/service/BrokerService.java       | 14 +++-
 .../pulsar/broker/service/ReplicatorTest.java      | 80 ++++++++++++++++++++++
 .../org/apache/pulsar/common/util/FutureUtil.java  | 31 +++++++++
 3 files changed, 122 insertions(+), 3 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 575738a..9c9d482 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -970,7 +970,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
     }
 
     private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String topic) {
-        CompletableFuture<Optional<Topic>> topicFuture = futureWithDeadline();
+        CompletableFuture<Optional<Topic>> topicFuture = FutureUtil.futureWithDeadline(executor());
 
         if (!pulsar.getConfiguration().isEnableNonPersistentTopics()) {
             if (log.isDebugEnabled()) {
@@ -1233,8 +1233,16 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
                                     long topicLoadLatencyMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime())
                                             - topicCreateTimeMs;
                                     pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs);
-                                    addTopicToStatsMaps(topicName, persistentTopic);
-                                    topicFuture.complete(Optional.of(persistentTopic));
+                                    if (topicFuture.isCompletedExceptionally()) {
+                                        log.warn("{} future is already completed with failure {}, closing the topic",
+                                                topic, FutureUtil.getException(topicFuture));
+                                        persistentTopic.stopReplProducers().whenComplete((v, exception) -> {
+                                            topics.remove(topic, topicFuture);
+                                        });
+                                    } else {
+                                        addTopicToStatsMaps(topicName, persistentTopic);
+                                        topicFuture.complete(Optional.of(persistentTopic));
+                                    }
                                 }).exceptionally((ex) -> {
                                     log.warn(
                                             "Replication or dedup check failed."
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index e5219d5..d261e85 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -18,11 +18,13 @@
  */
 package org.apache.pulsar.broker.service;
 
+import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.spy;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
@@ -34,11 +36,13 @@ import io.netty.buffer.ByteBuf;
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.util.List;
+import java.util.Optional;
 import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -52,6 +56,7 @@ import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.ManagedLedgerException.CursorAlreadyClosedException;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
@@ -65,6 +70,7 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.RawMessage;
 import org.apache.pulsar.client.api.RawReader;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.client.impl.ProducerImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
@@ -1041,6 +1047,80 @@ public class ReplicatorTest extends ReplicatorTestBase {
         nonPersistentProducer2.close();
     }
 
+    @Test
+    public void testCleanupTopic() throws Exception {
+
+        final String cluster1 = pulsar1.getConfig().getClusterName();
+        final String cluster2 = pulsar2.getConfig().getClusterName();
+        final String namespace = "pulsar/ns-" + System.nanoTime();
+        final String topicName = "persistent://" + namespace + "/cleanTopic";
+        final String topicMlName = namespace + "/persistent/cleanTopic";
+        admin1.namespaces().createNamespace(namespace, Sets.newHashSet(cluster1, cluster2));
+
+        PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS)
+                .build();
+
+        long topicLoadTimeoutSeconds = 3;
+        config1.setTopicLoadTimeoutSeconds(topicLoadTimeoutSeconds);
+        config2.setTopicLoadTimeoutSeconds(topicLoadTimeoutSeconds);
+
+        ManagedLedgerFactoryImpl mlFactory = (ManagedLedgerFactoryImpl) pulsar1.getManagedLedgerClientFactory()
+                .getManagedLedgerFactory();
+        Field ledgersField = ManagedLedgerFactoryImpl.class.getDeclaredField("ledgers");
+        ledgersField.setAccessible(true);
+        ConcurrentHashMap<String, CompletableFuture<ManagedLedgerImpl>> ledgers = (ConcurrentHashMap<String, CompletableFuture<ManagedLedgerImpl>>) ledgersField
+                .get(mlFactory);
+        CompletableFuture<ManagedLedgerImpl> mlFuture = new CompletableFuture<>();
+        ledgers.put(topicMlName, mlFuture);
+
+        try {
+            Consumer<byte[]> consumer = client1.newConsumer().topic(topicName).subscriptionType(SubscriptionType.Shared)
+                    .subscriptionName("my-subscriber-name").subscribeAsync().get(100, TimeUnit.MILLISECONDS);
+            fail("consumer should fail due to topic loading failure");
+        } catch (Exception e) {
+            // Ok
+        }
+
+        CompletableFuture<Optional<Topic>> topicFuture = null;
+        for (int i = 0; i < 5; i++) {
+            topicFuture = pulsar1.getBrokerService().getTopics().get(topicName);
+            if (topicFuture != null) {
+                break;
+            }
+            Thread.sleep(i * 1000);
+        }
+
+        try {
+            topicFuture.get();
+            fail("topic creation should fail");
+        } catch (Exception e) {
+            // Ok
+        }
+
+        final CompletableFuture<Optional<Topic>> timedOutTopicFuture = topicFuture;
+        // timeout topic future should be removed from cache
+        retryStrategically((test) -> pulsar1.getBrokerService().getTopic(topicName, false) != timedOutTopicFuture, 5,
+                1000);
+
+        assertNotEquals(timedOutTopicFuture, pulsar1.getBrokerService().getTopics().get(topicName));
+
+        try {
+            Consumer<byte[]> consumer = client1.newConsumer().topic(topicName).subscriptionType(SubscriptionType.Shared)
+                    .subscriptionName("my-subscriber-name").subscribeAsync().get(100, TimeUnit.MILLISECONDS);
+            fail("consumer should fail due to topic loading failure");
+        } catch (Exception e) {
+            // Ok
+        }
+
+        ManagedLedgerImpl ml = (ManagedLedgerImpl) mlFactory.open(topicMlName + "-2");
+        mlFuture.complete(ml);
+
+        Consumer<byte[]> consumer = client1.newConsumer().topic(topicName).subscriptionName("my-subscriber-name")
+                .subscriptionType(SubscriptionType.Shared).subscribeAsync()
+                .get(2 * topicLoadTimeoutSeconds, TimeUnit.SECONDS);
+
+        consumer.close();
+    }
     private static final Logger log = LoggerFactory.getLogger(ReplicatorTest.class);
 
 }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
index 7356950..0c3a0c0 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
@@ -20,8 +20,10 @@ package org.apache.pulsar.common.util;
 
 import java.time.Duration;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
@@ -162,4 +164,33 @@ public class FutureUtil {
             return this;
         }
     }
+
+    public static <T> CompletableFuture<T> futureWithDeadline(ScheduledExecutorService executor, Long delay,
+            TimeUnit unit, Exception exp) {
+        CompletableFuture<T> future = new CompletableFuture<T>();
+        executor.schedule(() -> {
+            if (!future.isDone()) {
+                future.completeExceptionally(exp);
+            }
+        }, delay, unit);
+        return future;
+    }
+
+    public static <T> CompletableFuture<T> futureWithDeadline(ScheduledExecutorService executor) {
+        return futureWithDeadline(executor, 60000L, TimeUnit.MILLISECONDS,
+                new TimeoutException("Future didn't finish within deadline"));
+    }
+
+    public static <T> Optional<Throwable> getException(CompletableFuture<T> future) {
+        if (future != null && future.isCompletedExceptionally()) {
+            try {
+                future.get();
+            } catch (InterruptedException e) {
+                return Optional.ofNullable(e);
+            } catch (ExecutionException e) {
+                return Optional.ofNullable(e.getCause());
+            }
+        }
+        return Optional.empty();
+    }
 }