You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by yu...@apache.org on 2024/03/26 06:58:31 UTC
(pulsar) branch branch-2.10 updated: [fix] [broker] fix mismatch between dispatcher.consumerList and dispatcher.consumerSet (#22283)
This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new 9b432d7151e [fix] [broker] fix mismatch between dispatcher.consumerList and dispatcher.consumerSet (#22283)
9b432d7151e is described below
commit 9b432d7151e1e61069149acc1a642f7a34857a73
Author: fengyubiao <yu...@streamnative.io>
AuthorDate: Tue Mar 26 07:41:07 2024 +0800
[fix] [broker] fix mismatch between dispatcher.consumerList and dispatcher.consumerSet (#22283)
(cherry picked from commit a52945b1c51fa874667eecb9fea9bf03e5d6153b)
---
.../apache/pulsar/broker/service/ServerCnx.java | 16 ++-
.../PersistentDispatcherMultipleConsumers.java | 8 +-
.../pulsar/broker/service/ServerCnxTest.java | 5 +-
...impleProducerConsumerMLInitializeDelayTest.java | 108 +++++++++++++++++++++
4 files changed, 131 insertions(+), 6 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 6d84e22f162..08d84f3e4fa 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1037,10 +1037,20 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
commandSender.sendErrorResponse(requestId, ServerError.ServiceNotReady,
"Consumer is already present on the connection");
} else if (existingConsumerFuture.isCompletedExceptionally()){
+ log.warn("[{}][{}][{}] A failed consumer with id is already present on the connection,"
+ + " consumerId={}", remoteAddress, topicName, subscriptionName, consumerId);
ServerError error = getErrorCodeWithErrorLog(existingConsumerFuture, true,
- String.format("Consumer subscribe failure. remoteAddress: %s, subscription: %s",
- remoteAddress, subscriptionName));
- consumers.remove(consumerId, existingConsumerFuture);
+ String.format("A failed consumer with id is already present on the connection."
+ + " consumerId: %s, remoteAddress: %s, subscription: %s",
+ consumerId, remoteAddress, subscriptionName));
+ /**
+ * This future may was failed due to the client closed a in-progress subscribing.
+ * See {@link #handleCloseConsumer(CommandCloseConsumer)}
+ * Do not remove the failed future at current line, it will be removed after the progress of
+ * the previous subscribing is done.
+ * Before the previous subscribing is done, the new subscribe request will always fail.
+ * This mechanism is in order to prevent more complex logic to handle the race conditions.
+ */
commandSender.sendErrorResponse(requestId, error,
"Consumer that failed is already present on the connection");
} else {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index b1cc82b146f..bf7b8e42053 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -156,9 +156,15 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
}
if (isConsumersExceededOnSubscription()) {
- log.warn("[{}] Attempting to add consumer to subscription which reached max consumers limit", name);
+ log.warn("[{}] Attempting to add consumer to subscription which reached max consumers limit {}",
+ name, consumer);
throw new ConsumerBusyException("Subscription reached max consumers limit");
}
+ // This is not an expected scenario, it will never happen in expected. Just print a warn log if the unexpected
+ // scenario happens. See more detail: https://github.com/apache/pulsar/pull/22283.
+ if (consumerSet.contains(consumer)) {
+ log.warn("[{}] Attempting to add a consumer that already registered {}", name, consumer);
+ }
consumerList.add(consumer);
if (consumerList.size() > 1
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 4a4628bece4..8d0f31a2090 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -3130,8 +3130,9 @@ public class ServerCnxTest {
};
// assert error response
assertTrue(responseAssert.test(responseAssert));
- // assert consumer-delete event occur
- assertEquals(1L,
+ // The delete event will only occur after the future is completed.
+ // assert consumer-delete event will not occur.
+ assertEquals(0L,
deleteTimesMark.getAllValues().stream().filter(f -> f == existingConsumerFuture).count());
// Server will not close the connection
assertTrue(channel.isOpen());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerMLInitializeDelayTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerMLInitializeDelayTest.java
new file mode 100644
index 00000000000..ab4e063ae3d
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerMLInitializeDelayTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import com.carrotsearch.hppc.ObjectSet;
+import java.time.Duration;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.service.Dispatcher;
+import org.apache.pulsar.common.naming.TopicName;
+import org.awaitility.Awaitility;
+import org.awaitility.reflect.WhiteboxImpl;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker-api")
+public class SimpleProducerConsumerMLInitializeDelayTest extends ProducerConsumerBase {
+
+ @BeforeClass(alwaysRun = true)
+ @Override
+ protected void setup() throws Exception {
+ super.internalSetup();
+ super.producerBaseSetup();
+ }
+
+ @AfterClass(alwaysRun = true)
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Override
+ protected void doInitConf() throws Exception {
+ super.doInitConf();
+ conf.setTopicLoadTimeoutSeconds(60 * 5);
+ }
+
+ @Test(timeOut = 30 * 1000)
+ public void testConsumerListMatchesConsumerSet() throws Exception {
+ final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+ final String subName = "sub";
+ final int clientOperationTimeout = 3;
+ final int loadMLDelayMillis = clientOperationTimeout * 3 * 1000;
+ final int clientMaxBackoffSeconds = clientOperationTimeout * 2;
+ admin.topics().createNonPartitionedTopic(topicName);
+ // Create a client with a low operation timeout.
+ PulsarClient client = PulsarClient.builder()
+ .serviceUrl(lookupUrl.toString())
+ .operationTimeout(clientOperationTimeout, TimeUnit.SECONDS)
+ .maxBackoffInterval(clientMaxBackoffSeconds, TimeUnit.SECONDS)
+ .build();
+ Consumer consumer = client.newConsumer()
+ .topic(topicName)
+ .subscriptionName(subName)
+ .subscriptionType(SubscriptionType.Shared)
+ .subscribe();
+ // Inject a delay for the initialization of ML, to make the consumer to register twice.
+ // Consumer register twice: the first will be timeout, and try again.
+ AtomicInteger delayTimes = new AtomicInteger();
+ mockZooKeeper.delay(loadMLDelayMillis, (op, s) -> {
+ if (op.toString().equals("GET") && s.contains(TopicName.get(topicName).getPersistenceNamingEncoding())) {
+ return delayTimes.incrementAndGet() == 1;
+ }
+ return false;
+ });
+ admin.topics().unload(topicName);
+ // Verify: at last, "dispatcher.consumers.size" equals "dispatcher.consumerList.size".
+ Awaitility.await().atMost(Duration.ofSeconds(loadMLDelayMillis * 3))
+ .ignoreExceptions().untilAsserted(() -> {
+ Dispatcher dispatcher = pulsar.getBrokerService()
+ .getTopic(topicName, false).join().get()
+ .getSubscription(subName).getDispatcher();
+ ObjectSet consumerSet = WhiteboxImpl.getInternalState(dispatcher, "consumerSet");
+ List consumerList = WhiteboxImpl.getInternalState(dispatcher, "consumerList");
+ log.info("consumerSet_size: {}, consumerList_size: {}", consumerSet.size(), consumerList.size());
+ Assert.assertEquals(consumerList.size(), 1);
+ Assert.assertEquals(consumerSet.size(), 1);
+ });
+
+ // Verify: the topic can be deleted.
+ consumer.close();
+ admin.topics().delete(topicName);
+ // cleanup.
+ client.close();
+ }
+}