You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2024/01/08 14:01:14 UTC
(pulsar) branch master updated: [fix] [client] Messages lost due to TopicListWatcher reconnect (#21853)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 042e7691b6e [fix] [client] Messages lost due to TopicListWatcher reconnect (#21853)
042e7691b6e is described below
commit 042e7691b6ef7b7e826b3ec27740fb1f96fbc0b0
Author: fengyubiao <yu...@streamnative.io>
AuthorDate: Mon Jan 8 22:01:05 2024 +0800
[fix] [client] Messages lost due to TopicListWatcher reconnect (#21853)
---
.../broker/auth/MockedPulsarServiceBaseTest.java | 9 ++
.../client/impl/PatternTopicsConsumerImplTest.java | 66 ++++++++++---
.../impl/PatternMultiTopicsConsumerImpl.java | 105 +++++++++++++++------
.../pulsar/client/impl/TopicListWatcher.java | 7 +-
.../pulsar/client/impl/TopicListWatcherTest.java | 2 +-
5 files changed, 148 insertions(+), 41 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index b8d75bd0fbc..eb75963061e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -708,5 +708,14 @@ public abstract class MockedPulsarServiceBaseTest extends TestRetrySupport {
private PersistentTopic persistentTopic;
}
+ protected void sleepSeconds(int seconds){
+ try {
+ Thread.sleep(1000 * seconds);
+ } catch (InterruptedException e) {
+ log.warn("This thread has been interrupted", e);
+ Thread.currentThread().interrupt();
+ }
+ }
+
private static final Logger log = LoggerFactory.getLogger(MockedPulsarServiceBaseTest.class);
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
index 451f93067b2..c708b4cae0a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
@@ -37,14 +37,18 @@ import java.util.stream.IntStream;
import io.netty.util.Timeout;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.InjectedClientCnxClientBuilder;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.RegexSubscriptionMode;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.api.proto.BaseCommand;
+import org.apache.pulsar.common.api.proto.CommandWatchTopicListSuccess;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.awaitility.Awaitility;
@@ -53,6 +57,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@Test(groups = "broker-impl")
@@ -620,13 +625,28 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase {
producer3.close();
}
- @Test(timeOut = testTimeout)
- public void testAutoSubscribePatterConsumerFromBrokerWatcher() throws Exception {
- String key = "AutoSubscribePatternConsumer";
- String subscriptionName = "my-ex-subscription-" + key;
+ @DataProvider(name= "delayTypesOfWatchingTopics")
+ public Object[][] delayTypesOfWatchingTopics(){
+ return new Object[][]{
+ {true},
+ {false}
+ };
+ }
- Pattern pattern = Pattern.compile("persistent://my-property/my-ns/pattern-topic.*");
- Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ @Test(timeOut = testTimeout, dataProvider = "delayTypesOfWatchingTopics")
+ public void testAutoSubscribePatterConsumerFromBrokerWatcher(boolean delayWatchingTopics) throws Exception {
+ final String key = "AutoSubscribePatternConsumer";
+ final String subscriptionName = "my-ex-subscription-" + key;
+ final Pattern pattern = Pattern.compile("persistent://my-property/my-ns/pattern-topic.*");
+
+ PulsarClient client = null;
+ if (delayWatchingTopics) {
+ client = createDelayWatchTopicsClient();
+ } else {
+ client = pulsarClient;
+ }
+
+ Consumer<byte[]> consumer = client.newConsumer()
.topicsPattern(pattern)
// Disable automatic discovery.
.patternAutoDiscoveryPeriod(1000)
@@ -636,12 +656,6 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase {
.receiverQueueSize(4)
.subscribe();
- // Wait topic list watcher creation.
- Awaitility.await().untilAsserted(() -> {
- CompletableFuture completableFuture = WhiteboxImpl.getInternalState(consumer, "watcherFuture");
- assertTrue(completableFuture.isDone() && !completableFuture.isCompletedExceptionally());
- });
-
// 1. create partition
String topicName = "persistent://my-property/my-ns/pattern-topic-1-" + key;
TenantInfoImpl tenantInfo = createDefaultTenantInfo();
@@ -657,7 +671,35 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase {
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 1);
});
+ // cleanup.
consumer.close();
+ admin.topics().deletePartitionedTopic(topicName);
+ if (delayWatchingTopics) {
+ client.close();
+ }
+ }
+
+ private PulsarClient createDelayWatchTopicsClient() throws Exception {
+ ClientBuilderImpl clientBuilder = (ClientBuilderImpl) PulsarClient.builder().serviceUrl(lookupUrl.toString());
+ return InjectedClientCnxClientBuilder.create(clientBuilder,
+ (conf, eventLoopGroup) -> new ClientCnx(conf, eventLoopGroup) {
+ public CompletableFuture<CommandWatchTopicListSuccess> newWatchTopicList(
+ BaseCommand command, long requestId) {
+ // Inject 2 seconds delay when sending command New Watch Topics.
+ CompletableFuture<CommandWatchTopicListSuccess> res = new CompletableFuture<>();
+ new Thread(() -> {
+ sleepSeconds(2);
+ super.newWatchTopicList(command, requestId).whenComplete((v, ex) -> {
+ if (ex != null) {
+ res.completeExceptionally(ex);
+ } else {
+ res.complete(v);
+ }
+ });
+ }).start();
+ return res;
+ }
+ });
}
// simulate subscribe a pattern which has 3 topics, but then matched topic added in.
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
index c6ea6216cc1..f3ebcdee6c0 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
@@ -31,6 +31,7 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.pulsar.client.api.Consumer;
@@ -50,8 +51,19 @@ public class PatternMultiTopicsConsumerImpl<T> extends MultiTopicsConsumerImpl<T
private final Pattern topicsPattern;
private final TopicsChangedListener topicsChangeListener;
private final Mode subscriptionMode;
- private final CompletableFuture<TopicListWatcher> watcherFuture;
+ private final CompletableFuture<TopicListWatcher> watcherFuture = new CompletableFuture<>();
protected NamespaceName namespaceName;
+
+ /**
+ * There is two task to re-check topic changes, the both tasks will not be take affects at the same time.
+ * 1. {@link #recheckTopicsChangeAfterReconnect}: it will be called after the {@link TopicListWatcher} reconnected
+ * if you enabled {@link TopicListWatcher}. This backoff used to do a retry if
+ * {@link #recheckTopicsChangeAfterReconnect} is failed.
+ * 2. {@link #run} A scheduled task to trigger re-check topic changes, it will be used if you disabled
+ * {@link TopicListWatcher}.
+ */
+ private final Backoff recheckPatternTaskBackoff;
+ private final AtomicInteger recheckPatternEpoch = new AtomicInteger();
private volatile Timeout recheckPatternTimeout = null;
private volatile String topicsHash;
@@ -69,6 +81,11 @@ public class PatternMultiTopicsConsumerImpl<T> extends MultiTopicsConsumerImpl<T
this.topicsPattern = topicsPattern;
this.topicsHash = topicsHash;
this.subscriptionMode = subscriptionMode;
+ this.recheckPatternTaskBackoff = new BackoffBuilder()
+ .setInitialTime(client.getConfiguration().getInitialBackoffIntervalNanos(), TimeUnit.NANOSECONDS)
+ .setMax(client.getConfiguration().getMaxBackoffIntervalNanos(), TimeUnit.NANOSECONDS)
+ .setMandatoryStop(0, TimeUnit.SECONDS)
+ .create();
if (this.namespaceName == null) {
this.namespaceName = getNameSpaceFromPattern(topicsPattern);
@@ -78,11 +95,10 @@ public class PatternMultiTopicsConsumerImpl<T> extends MultiTopicsConsumerImpl<T
this.topicsChangeListener = new PatternTopicsChangedListener();
this.recheckPatternTimeout = client.timer()
.newTimeout(this, Math.max(1, conf.getPatternAutoDiscoveryPeriod()), TimeUnit.SECONDS);
- this.watcherFuture = new CompletableFuture<>();
if (subscriptionMode == Mode.PERSISTENT) {
long watcherId = client.newTopicListWatcherId();
new TopicListWatcher(topicsChangeListener, client, topicsPattern, watcherId,
- namespaceName, topicsHash, watcherFuture);
+ namespaceName, topicsHash, watcherFuture, () -> recheckTopicsChangeAfterReconnect());
watcherFuture
.thenAccept(__ -> recheckPatternTimeout.cancel())
.exceptionally(ex -> {
@@ -99,40 +115,75 @@ public class PatternMultiTopicsConsumerImpl<T> extends MultiTopicsConsumerImpl<T
return TopicName.get(pattern.pattern()).getNamespaceObject();
}
+ /**
+ * This method will be called after the {@link TopicListWatcher} reconnected after enabled {@link TopicListWatcher}.
+ */
+ private void recheckTopicsChangeAfterReconnect() {
+ // Skip if closed or the task has been cancelled.
+ if (getState() == State.Closing || getState() == State.Closed) {
+ return;
+ }
+ // Do check.
+ recheckTopicsChange().whenComplete((ignore, ex) -> {
+ if (ex != null) {
+ log.warn("[{}] Failed to recheck topics change: {}", topic, ex.getMessage());
+ long delayMs = recheckPatternTaskBackoff.next();
+ client.timer().newTimeout(timeout -> {
+ recheckTopicsChangeAfterReconnect();
+ }, delayMs, TimeUnit.MILLISECONDS);
+ } else {
+ recheckPatternTaskBackoff.reset();
+ }
+ });
+ }
+
// TimerTask to recheck topics change, and trigger subscribe/unsubscribe based on the change.
@Override
public void run(Timeout timeout) throws Exception {
if (timeout.isCancelled()) {
return;
}
- client.getLookup().getTopicsUnderNamespace(namespaceName, subscriptionMode, topicsPattern.pattern(), topicsHash)
- .thenCompose(getTopicsResult -> {
+ recheckTopicsChange().exceptionally(ex -> {
+ log.warn("[{}] Failed to recheck topics change: {}", topic, ex.getMessage());
+ return null;
+ }).thenAccept(__ -> {
+ // schedule the next re-check task
+ this.recheckPatternTimeout = client.timer()
+ .newTimeout(PatternMultiTopicsConsumerImpl.this,
+ Math.max(1, conf.getPatternAutoDiscoveryPeriod()), TimeUnit.SECONDS);
+ });
+ }
- if (log.isDebugEnabled()) {
- log.debug("Get topics under namespace {}, topics.size: {}, topicsHash: {}, filtered: {}",
- namespaceName, getTopicsResult.getTopics().size(), getTopicsResult.getTopicsHash(),
- getTopicsResult.isFiltered());
- getTopicsResult.getTopics().forEach(topicName ->
- log.debug("Get topics under namespace {}, topic: {}", namespaceName, topicName));
- }
+ private CompletableFuture<Void> recheckTopicsChange() {
+ String pattern = topicsPattern.pattern();
+ final int epoch = recheckPatternEpoch.incrementAndGet();
+ return client.getLookup().getTopicsUnderNamespace(namespaceName, subscriptionMode, pattern, topicsHash)
+ .thenCompose(getTopicsResult -> {
+ // If "recheckTopicsChange" has been called more than one times, only make the last one take affects.
+ // Use "synchronized (recheckPatternTaskBackoff)" instead of
+ // `synchronized(PatternMultiTopicsConsumerImpl.this)` to avoid locking in a wider range.
+ synchronized (recheckPatternTaskBackoff) {
+ if (recheckPatternEpoch.get() > epoch) {
+ return CompletableFuture.completedFuture(null);
+ }
+ if (log.isDebugEnabled()) {
+ log.debug("Get topics under namespace {}, topics.size: {}, topicsHash: {}, filtered: {}",
+ namespaceName, getTopicsResult.getTopics().size(), getTopicsResult.getTopicsHash(),
+ getTopicsResult.isFiltered());
+ getTopicsResult.getTopics().forEach(topicName ->
+ log.debug("Get topics under namespace {}, topic: {}", namespaceName, topicName));
+ }
- final List<String> oldTopics = new ArrayList<>(getPartitionedTopics());
- for (String partition : getPartitions()) {
- TopicName topicName = TopicName.get(partition);
- if (!topicName.isPartitioned() || !oldTopics.contains(topicName.getPartitionedTopicName())) {
- oldTopics.add(partition);
+ final List<String> oldTopics = new ArrayList<>(getPartitionedTopics());
+ for (String partition : getPartitions()) {
+ TopicName topicName = TopicName.get(partition);
+ if (!topicName.isPartitioned() || !oldTopics.contains(topicName.getPartitionedTopicName())) {
+ oldTopics.add(partition);
+ }
}
+ return updateSubscriptions(topicsPattern, this::setTopicsHash, getTopicsResult,
+ topicsChangeListener, oldTopics);
}
- return updateSubscriptions(topicsPattern, this::setTopicsHash, getTopicsResult,
- topicsChangeListener, oldTopics);
- }).exceptionally(ex -> {
- log.warn("[{}] Failed to recheck topics change: {}", topic, ex.getMessage());
- return null;
- }).thenAccept(__ -> {
- // schedule the next re-check task
- this.recheckPatternTimeout = client.timer()
- .newTimeout(PatternMultiTopicsConsumerImpl.this,
- Math.max(1, conf.getPatternAutoDiscoveryPeriod()), TimeUnit.SECONDS);
});
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java
index 2ce784dbaac..489a07a606e 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java
@@ -56,11 +56,14 @@ public class TopicListWatcher extends HandlerState implements ConnectionHandler.
private final List<Throwable> previousExceptions = new CopyOnWriteArrayList<>();
private final AtomicReference<ClientCnx> clientCnxUsedForWatcherRegistration = new AtomicReference<>();
+ private final Runnable recheckTopicsChangeAfterReconnect;
+
public TopicListWatcher(PatternMultiTopicsConsumerImpl.TopicsChangedListener topicsChangeListener,
PulsarClientImpl client, Pattern topicsPattern, long watcherId,
NamespaceName namespace, String topicsHash,
- CompletableFuture<TopicListWatcher> watcherFuture) {
+ CompletableFuture<TopicListWatcher> watcherFuture,
+ Runnable recheckTopicsChangeAfterReconnect) {
super(client, topicsPattern.pattern());
this.topicsChangeListener = topicsChangeListener;
this.name = "Watcher(" + topicsPattern + ")";
@@ -77,6 +80,7 @@ public class TopicListWatcher extends HandlerState implements ConnectionHandler.
this.namespace = namespace;
this.topicsHash = topicsHash;
this.watcherFuture = watcherFuture;
+ this.recheckTopicsChangeAfterReconnect = recheckTopicsChangeAfterReconnect;
connectionHandler.grabCnx();
}
@@ -141,6 +145,7 @@ public class TopicListWatcher extends HandlerState implements ConnectionHandler.
this.connectionHandler.resetBackoff();
+ recheckTopicsChangeAfterReconnect.run();
watcherFuture.complete(this);
future.complete(null);
}).exceptionally((e) -> {
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicListWatcherTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicListWatcherTest.java
index dd75770b568..7e9fd601d4f 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicListWatcherTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicListWatcherTest.java
@@ -71,7 +71,7 @@ public class TopicListWatcherTest {
watcherFuture = new CompletableFuture<>();
watcher = new TopicListWatcher(listener, client,
Pattern.compile(topic), 7,
- NamespaceName.get("tenant/ns"), null, watcherFuture);
+ NamespaceName.get("tenant/ns"), null, watcherFuture, () -> {});
}
@Test