You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2024/01/08 14:01:14 UTC

(pulsar) branch master updated: [fix] [client] Messages lost due to TopicListWatcher reconnect (#21853)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 042e7691b6e [fix] [client] Messages lost due to TopicListWatcher reconnect (#21853)
042e7691b6e is described below

commit 042e7691b6ef7b7e826b3ec27740fb1f96fbc0b0
Author: fengyubiao <yu...@streamnative.io>
AuthorDate: Mon Jan 8 22:01:05 2024 +0800

    [fix] [client] Messages lost due to TopicListWatcher reconnect (#21853)
---
 .../broker/auth/MockedPulsarServiceBaseTest.java   |   9 ++
 .../client/impl/PatternTopicsConsumerImplTest.java |  66 ++++++++++---
 .../impl/PatternMultiTopicsConsumerImpl.java       | 105 +++++++++++++++------
 .../pulsar/client/impl/TopicListWatcher.java       |   7 +-
 .../pulsar/client/impl/TopicListWatcherTest.java   |   2 +-
 5 files changed, 148 insertions(+), 41 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index b8d75bd0fbc..eb75963061e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -708,5 +708,14 @@ public abstract class MockedPulsarServiceBaseTest extends TestRetrySupport {
         private PersistentTopic persistentTopic;
     }
 
+    protected void sleepSeconds(int seconds){
+        try {
+            Thread.sleep(1000 * seconds);
+        } catch (InterruptedException e) {
+            log.warn("This thread has been interrupted", e);
+            Thread.currentThread().interrupt();
+        }
+    }
+
     private static final Logger log = LoggerFactory.getLogger(MockedPulsarServiceBaseTest.class);
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
index 451f93067b2..c708b4cae0a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
@@ -37,14 +37,18 @@ import java.util.stream.IntStream;
 import io.netty.util.Timeout;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.InjectedClientCnxClientBuilder;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.RegexSubscriptionMode;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.api.proto.BaseCommand;
+import org.apache.pulsar.common.api.proto.CommandWatchTopicListSuccess;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.awaitility.Awaitility;
@@ -53,6 +57,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 @Test(groups = "broker-impl")
@@ -620,13 +625,28 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase {
         producer3.close();
     }
 
-    @Test(timeOut = testTimeout)
-    public void testAutoSubscribePatterConsumerFromBrokerWatcher() throws Exception {
-        String key = "AutoSubscribePatternConsumer";
-        String subscriptionName = "my-ex-subscription-" + key;
+    @DataProvider(name= "delayTypesOfWatchingTopics")
+    public Object[][] delayTypesOfWatchingTopics(){
+        return new Object[][]{
+            {true},
+            {false}
+        };
+    }
 
-        Pattern pattern = Pattern.compile("persistent://my-property/my-ns/pattern-topic.*");
-        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+    @Test(timeOut = testTimeout, dataProvider = "delayTypesOfWatchingTopics")
+    public void testAutoSubscribePatterConsumerFromBrokerWatcher(boolean delayWatchingTopics) throws Exception {
+        final String key = "AutoSubscribePatternConsumer";
+        final String subscriptionName = "my-ex-subscription-" + key;
+        final Pattern pattern = Pattern.compile("persistent://my-property/my-ns/pattern-topic.*");
+
+        PulsarClient client = null;
+        if (delayWatchingTopics) {
+            client = createDelayWatchTopicsClient();
+        } else {
+            client = pulsarClient;
+        }
+
+        Consumer<byte[]> consumer = client.newConsumer()
                 .topicsPattern(pattern)
                 // Disable automatic discovery.
                 .patternAutoDiscoveryPeriod(1000)
@@ -636,12 +656,6 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase {
                 .receiverQueueSize(4)
                 .subscribe();
 
-        // Wait topic list watcher creation.
-        Awaitility.await().untilAsserted(() -> {
-            CompletableFuture completableFuture = WhiteboxImpl.getInternalState(consumer, "watcherFuture");
-            assertTrue(completableFuture.isDone() && !completableFuture.isCompletedExceptionally());
-        });
-
         // 1. create partition
         String topicName = "persistent://my-property/my-ns/pattern-topic-1-" + key;
         TenantInfoImpl tenantInfo = createDefaultTenantInfo();
@@ -657,7 +671,35 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase {
             assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 1);
         });
 
+        // cleanup.
         consumer.close();
+        admin.topics().deletePartitionedTopic(topicName);
+        if (delayWatchingTopics) {
+            client.close();
+        }
+    }
+
+    private PulsarClient createDelayWatchTopicsClient() throws Exception {
+        ClientBuilderImpl clientBuilder = (ClientBuilderImpl) PulsarClient.builder().serviceUrl(lookupUrl.toString());
+        return InjectedClientCnxClientBuilder.create(clientBuilder,
+            (conf, eventLoopGroup) -> new ClientCnx(conf, eventLoopGroup) {
+                public CompletableFuture<CommandWatchTopicListSuccess> newWatchTopicList(
+                        BaseCommand command, long requestId) {
+                    // Inject 2 seconds delay when sending command New Watch Topics.
+                    CompletableFuture<CommandWatchTopicListSuccess> res = new CompletableFuture<>();
+                    new Thread(() -> {
+                        sleepSeconds(2);
+                        super.newWatchTopicList(command, requestId).whenComplete((v, ex) -> {
+                            if (ex != null) {
+                                res.completeExceptionally(ex);
+                            } else {
+                                res.complete(v);
+                            }
+                        });
+                    }).start();
+                    return res;
+                }
+            });
     }
 
     // simulate subscribe a pattern which has 3 topics, but then matched topic added in.
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
index c6ea6216cc1..f3ebcdee6c0 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
@@ -31,6 +31,7 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import org.apache.pulsar.client.api.Consumer;
@@ -50,8 +51,19 @@ public class PatternMultiTopicsConsumerImpl<T> extends MultiTopicsConsumerImpl<T
     private final Pattern topicsPattern;
     private final TopicsChangedListener topicsChangeListener;
     private final Mode subscriptionMode;
-    private final CompletableFuture<TopicListWatcher> watcherFuture;
+    private final CompletableFuture<TopicListWatcher> watcherFuture = new CompletableFuture<>();
     protected NamespaceName namespaceName;
+
+    /**
+     * There is two task to re-check topic changes, the both tasks will not be take affects at the same time.
+     * 1. {@link #recheckTopicsChangeAfterReconnect}: it will be called after the {@link TopicListWatcher} reconnected
+     *     if you enabled {@link TopicListWatcher}. This backoff used to do a retry if
+     *     {@link #recheckTopicsChangeAfterReconnect} is failed.
+     * 2. {@link #run} A scheduled task to trigger re-check topic changes, it will be used if you disabled
+     *     {@link TopicListWatcher}.
+     */
+    private final Backoff recheckPatternTaskBackoff;
+    private final AtomicInteger recheckPatternEpoch = new AtomicInteger();
     private volatile Timeout recheckPatternTimeout = null;
     private volatile String topicsHash;
 
@@ -69,6 +81,11 @@ public class PatternMultiTopicsConsumerImpl<T> extends MultiTopicsConsumerImpl<T
         this.topicsPattern = topicsPattern;
         this.topicsHash = topicsHash;
         this.subscriptionMode = subscriptionMode;
+        this.recheckPatternTaskBackoff = new BackoffBuilder()
+                .setInitialTime(client.getConfiguration().getInitialBackoffIntervalNanos(), TimeUnit.NANOSECONDS)
+                .setMax(client.getConfiguration().getMaxBackoffIntervalNanos(), TimeUnit.NANOSECONDS)
+                .setMandatoryStop(0, TimeUnit.SECONDS)
+                .create();
 
         if (this.namespaceName == null) {
             this.namespaceName = getNameSpaceFromPattern(topicsPattern);
@@ -78,11 +95,10 @@ public class PatternMultiTopicsConsumerImpl<T> extends MultiTopicsConsumerImpl<T
         this.topicsChangeListener = new PatternTopicsChangedListener();
         this.recheckPatternTimeout = client.timer()
                 .newTimeout(this, Math.max(1, conf.getPatternAutoDiscoveryPeriod()), TimeUnit.SECONDS);
-        this.watcherFuture = new CompletableFuture<>();
         if (subscriptionMode == Mode.PERSISTENT) {
             long watcherId = client.newTopicListWatcherId();
             new TopicListWatcher(topicsChangeListener, client, topicsPattern, watcherId,
-                namespaceName, topicsHash, watcherFuture);
+                namespaceName, topicsHash, watcherFuture, () -> recheckTopicsChangeAfterReconnect());
             watcherFuture
                .thenAccept(__ -> recheckPatternTimeout.cancel())
                .exceptionally(ex -> {
@@ -99,40 +115,75 @@ public class PatternMultiTopicsConsumerImpl<T> extends MultiTopicsConsumerImpl<T
         return TopicName.get(pattern.pattern()).getNamespaceObject();
     }
 
+    /**
+     * This method will be called after the {@link TopicListWatcher} reconnected after enabled {@link TopicListWatcher}.
+     */
+    private void recheckTopicsChangeAfterReconnect() {
+        // Skip if closed or the task has been cancelled.
+        if (getState() == State.Closing || getState() == State.Closed) {
+            return;
+        }
+        // Do check.
+        recheckTopicsChange().whenComplete((ignore, ex) -> {
+            if (ex != null) {
+                log.warn("[{}] Failed to recheck topics change: {}", topic, ex.getMessage());
+                long delayMs = recheckPatternTaskBackoff.next();
+                client.timer().newTimeout(timeout -> {
+                    recheckTopicsChangeAfterReconnect();
+                }, delayMs, TimeUnit.MILLISECONDS);
+            } else {
+                recheckPatternTaskBackoff.reset();
+            }
+        });
+    }
+
     // TimerTask to recheck topics change, and trigger subscribe/unsubscribe based on the change.
     @Override
     public void run(Timeout timeout) throws Exception {
         if (timeout.isCancelled()) {
             return;
         }
-        client.getLookup().getTopicsUnderNamespace(namespaceName, subscriptionMode, topicsPattern.pattern(), topicsHash)
-            .thenCompose(getTopicsResult -> {
+        recheckTopicsChange().exceptionally(ex -> {
+            log.warn("[{}] Failed to recheck topics change: {}", topic, ex.getMessage());
+            return null;
+        }).thenAccept(__ -> {
+            // schedule the next re-check task
+            this.recheckPatternTimeout = client.timer()
+                    .newTimeout(PatternMultiTopicsConsumerImpl.this,
+                    Math.max(1, conf.getPatternAutoDiscoveryPeriod()), TimeUnit.SECONDS);
+        });
+    }
 
-                if (log.isDebugEnabled()) {
-                    log.debug("Get topics under namespace {}, topics.size: {}, topicsHash: {}, filtered: {}",
-                            namespaceName, getTopicsResult.getTopics().size(), getTopicsResult.getTopicsHash(),
-                            getTopicsResult.isFiltered());
-                    getTopicsResult.getTopics().forEach(topicName ->
-                            log.debug("Get topics under namespace {}, topic: {}", namespaceName, topicName));
-                }
+    private CompletableFuture<Void> recheckTopicsChange() {
+        String pattern = topicsPattern.pattern();
+        final int epoch = recheckPatternEpoch.incrementAndGet();
+        return client.getLookup().getTopicsUnderNamespace(namespaceName, subscriptionMode, pattern, topicsHash)
+            .thenCompose(getTopicsResult -> {
+                // If "recheckTopicsChange" has been called more than one times, only make the last one take affects.
+                // Use "synchronized (recheckPatternTaskBackoff)" instead of
+                // `synchronized(PatternMultiTopicsConsumerImpl.this)` to avoid locking in a wider range.
+                synchronized (recheckPatternTaskBackoff) {
+                    if (recheckPatternEpoch.get() > epoch) {
+                        return CompletableFuture.completedFuture(null);
+                    }
+                    if (log.isDebugEnabled()) {
+                        log.debug("Get topics under namespace {}, topics.size: {}, topicsHash: {}, filtered: {}",
+                                namespaceName, getTopicsResult.getTopics().size(), getTopicsResult.getTopicsHash(),
+                                getTopicsResult.isFiltered());
+                        getTopicsResult.getTopics().forEach(topicName ->
+                                log.debug("Get topics under namespace {}, topic: {}", namespaceName, topicName));
+                    }
 
-                final List<String> oldTopics = new ArrayList<>(getPartitionedTopics());
-                for (String partition : getPartitions()) {
-                    TopicName topicName = TopicName.get(partition);
-                    if (!topicName.isPartitioned() || !oldTopics.contains(topicName.getPartitionedTopicName())) {
-                        oldTopics.add(partition);
+                    final List<String> oldTopics = new ArrayList<>(getPartitionedTopics());
+                    for (String partition : getPartitions()) {
+                        TopicName topicName = TopicName.get(partition);
+                        if (!topicName.isPartitioned() || !oldTopics.contains(topicName.getPartitionedTopicName())) {
+                            oldTopics.add(partition);
+                        }
                     }
+                    return updateSubscriptions(topicsPattern, this::setTopicsHash, getTopicsResult,
+                            topicsChangeListener, oldTopics);
                 }
-                return updateSubscriptions(topicsPattern, this::setTopicsHash, getTopicsResult,
-                        topicsChangeListener, oldTopics);
-            }).exceptionally(ex -> {
-                log.warn("[{}] Failed to recheck topics change: {}", topic, ex.getMessage());
-                return null;
-            }).thenAccept(__ -> {
-                // schedule the next re-check task
-                this.recheckPatternTimeout = client.timer()
-                        .newTimeout(PatternMultiTopicsConsumerImpl.this,
-                        Math.max(1, conf.getPatternAutoDiscoveryPeriod()), TimeUnit.SECONDS);
             });
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java
index 2ce784dbaac..489a07a606e 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java
@@ -56,11 +56,14 @@ public class TopicListWatcher extends HandlerState implements ConnectionHandler.
     private final List<Throwable> previousExceptions = new CopyOnWriteArrayList<>();
     private final AtomicReference<ClientCnx> clientCnxUsedForWatcherRegistration = new AtomicReference<>();
 
+    private final Runnable recheckTopicsChangeAfterReconnect;
+
 
     public TopicListWatcher(PatternMultiTopicsConsumerImpl.TopicsChangedListener topicsChangeListener,
                             PulsarClientImpl client, Pattern topicsPattern, long watcherId,
                             NamespaceName namespace, String topicsHash,
-                            CompletableFuture<TopicListWatcher> watcherFuture) {
+                            CompletableFuture<TopicListWatcher> watcherFuture,
+                            Runnable recheckTopicsChangeAfterReconnect) {
         super(client, topicsPattern.pattern());
         this.topicsChangeListener = topicsChangeListener;
         this.name = "Watcher(" + topicsPattern + ")";
@@ -77,6 +80,7 @@ public class TopicListWatcher extends HandlerState implements ConnectionHandler.
         this.namespace = namespace;
         this.topicsHash = topicsHash;
         this.watcherFuture = watcherFuture;
+        this.recheckTopicsChangeAfterReconnect = recheckTopicsChangeAfterReconnect;
 
         connectionHandler.grabCnx();
     }
@@ -141,6 +145,7 @@ public class TopicListWatcher extends HandlerState implements ConnectionHandler.
 
                         this.connectionHandler.resetBackoff();
 
+                        recheckTopicsChangeAfterReconnect.run();
                         watcherFuture.complete(this);
                         future.complete(null);
                     }).exceptionally((e) -> {
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicListWatcherTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicListWatcherTest.java
index dd75770b568..7e9fd601d4f 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicListWatcherTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicListWatcherTest.java
@@ -71,7 +71,7 @@ public class TopicListWatcherTest {
         watcherFuture = new CompletableFuture<>();
         watcher = new TopicListWatcher(listener, client,
                 Pattern.compile(topic), 7,
-                NamespaceName.get("tenant/ns"), null, watcherFuture);
+                NamespaceName.get("tenant/ns"), null, watcherFuture, () -> {});
     }
 
     @Test