You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by yu...@apache.org on 2024/03/26 06:58:31 UTC

(pulsar) branch branch-2.10 updated: [fix] [broker] fix mismatch between dispatcher.consumerList and dispatcher.consumerSet (#22283)

This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 9b432d7151e [fix] [broker] fix mismatch between dispatcher.consumerList and dispatcher.consumerSet (#22283)
9b432d7151e is described below

commit 9b432d7151e1e61069149acc1a642f7a34857a73
Author: fengyubiao <yu...@streamnative.io>
AuthorDate: Tue Mar 26 07:41:07 2024 +0800

    [fix] [broker] fix mismatch between dispatcher.consumerList and dispatcher.consumerSet (#22283)
    
    (cherry picked from commit a52945b1c51fa874667eecb9fea9bf03e5d6153b)
---
 .../apache/pulsar/broker/service/ServerCnx.java    |  16 ++-
 .../PersistentDispatcherMultipleConsumers.java     |   8 +-
 .../pulsar/broker/service/ServerCnxTest.java       |   5 +-
 ...impleProducerConsumerMLInitializeDelayTest.java | 108 +++++++++++++++++++++
 4 files changed, 131 insertions(+), 6 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 6d84e22f162..08d84f3e4fa 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1037,10 +1037,20 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                         commandSender.sendErrorResponse(requestId, ServerError.ServiceNotReady,
                                 "Consumer is already present on the connection");
                     } else if (existingConsumerFuture.isCompletedExceptionally()){
+                        log.warn("[{}][{}][{}] A failed consumer with id is already present on the connection,"
+                                + " consumerId={}", remoteAddress, topicName, subscriptionName, consumerId);
                         ServerError error = getErrorCodeWithErrorLog(existingConsumerFuture, true,
-                                String.format("Consumer subscribe failure. remoteAddress: %s, subscription: %s",
-                                        remoteAddress, subscriptionName));
-                        consumers.remove(consumerId, existingConsumerFuture);
+                                String.format("A failed consumer with id is already present on the connection."
+                                                + " consumerId: %s, remoteAddress: %s, subscription: %s",
+                                        consumerId, remoteAddress, subscriptionName));
+                        /**
+                         * This future may was failed due to the client closed a in-progress subscribing.
+                         * See {@link #handleCloseConsumer(CommandCloseConsumer)}
+                         * Do not remove the failed future at current line, it will be removed after the progress of
+                         * the previous subscribing is done.
+                         * Before the previous subscribing is done, the new subscribe request will always fail.
+                         * This mechanism is in order to prevent more complex logic to handle the race conditions.
+                         */
                         commandSender.sendErrorResponse(requestId, error,
                                 "Consumer that failed is already present on the connection");
                     } else {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index b1cc82b146f..bf7b8e42053 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -156,9 +156,15 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
         }
 
         if (isConsumersExceededOnSubscription()) {
-            log.warn("[{}] Attempting to add consumer to subscription which reached max consumers limit", name);
+            log.warn("[{}] Attempting to add consumer to subscription which reached max consumers limit {}",
+                    name, consumer);
             throw new ConsumerBusyException("Subscription reached max consumers limit");
         }
+        // This is not an expected scenario, it will never happen in expected. Just print a warn log if the unexpected
+        // scenario happens. See more detail: https://github.com/apache/pulsar/pull/22283.
+        if (consumerSet.contains(consumer)) {
+            log.warn("[{}] Attempting to add a consumer that already registered {}", name, consumer);
+        }
 
         consumerList.add(consumer);
         if (consumerList.size() > 1
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 4a4628bece4..8d0f31a2090 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -3130,8 +3130,9 @@ public class ServerCnxTest {
             };
             // assert error response
             assertTrue(responseAssert.test(responseAssert));
-            // assert consumer-delete event occur
-            assertEquals(1L,
+            // The delete event will only occur after the future is completed.
+            // assert consumer-delete event will not occur.
+            assertEquals(0L,
                     deleteTimesMark.getAllValues().stream().filter(f -> f == existingConsumerFuture).count());
             // Server will not close the connection
             assertTrue(channel.isOpen());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerMLInitializeDelayTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerMLInitializeDelayTest.java
new file mode 100644
index 00000000000..ab4e063ae3d
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerMLInitializeDelayTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import com.carrotsearch.hppc.ObjectSet;
+import java.time.Duration;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.service.Dispatcher;
+import org.apache.pulsar.common.naming.TopicName;
+import org.awaitility.Awaitility;
+import org.awaitility.reflect.WhiteboxImpl;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker-api")
+public class SimpleProducerConsumerMLInitializeDelayTest extends ProducerConsumerBase {
+
+    @BeforeClass(alwaysRun = true)
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+    }
+
+    @AfterClass(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Override
+    protected void doInitConf() throws Exception {
+        super.doInitConf();
+        conf.setTopicLoadTimeoutSeconds(60 * 5);
+    }
+
+    @Test(timeOut = 30 * 1000)
+    public void testConsumerListMatchesConsumerSet() throws Exception {
+        final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+        final String subName = "sub";
+        final int clientOperationTimeout = 3;
+        final int loadMLDelayMillis = clientOperationTimeout * 3 * 1000;
+        final int clientMaxBackoffSeconds = clientOperationTimeout * 2;
+        admin.topics().createNonPartitionedTopic(topicName);
+        // Create a client with a low operation timeout.
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(lookupUrl.toString())
+                .operationTimeout(clientOperationTimeout, TimeUnit.SECONDS)
+                .maxBackoffInterval(clientMaxBackoffSeconds, TimeUnit.SECONDS)
+                .build();
+        Consumer consumer = client.newConsumer()
+                .topic(topicName)
+                .subscriptionName(subName)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscribe();
+        // Inject a delay for the initialization of ML, to make the consumer to register twice.
+        // Consumer register twice: the first will be timeout, and try again.
+        AtomicInteger delayTimes = new AtomicInteger();
+        mockZooKeeper.delay(loadMLDelayMillis, (op, s) -> {
+            if (op.toString().equals("GET") && s.contains(TopicName.get(topicName).getPersistenceNamingEncoding())) {
+                return delayTimes.incrementAndGet() == 1;
+            }
+            return false;
+        });
+        admin.topics().unload(topicName);
+        // Verify: at last, "dispatcher.consumers.size" equals "dispatcher.consumerList.size".
+        Awaitility.await().atMost(Duration.ofSeconds(loadMLDelayMillis * 3))
+                .ignoreExceptions().untilAsserted(() -> {
+            Dispatcher dispatcher = pulsar.getBrokerService()
+                            .getTopic(topicName, false).join().get()
+                            .getSubscription(subName).getDispatcher();
+            ObjectSet consumerSet = WhiteboxImpl.getInternalState(dispatcher, "consumerSet");
+            List consumerList = WhiteboxImpl.getInternalState(dispatcher, "consumerList");
+            log.info("consumerSet_size: {}, consumerList_size: {}", consumerSet.size(), consumerList.size());
+            Assert.assertEquals(consumerList.size(), 1);
+            Assert.assertEquals(consumerSet.size(), 1);
+        });
+
+        // Verify: the topic can be deleted.
+        consumer.close();
+        admin.topics().delete(topicName);
+        // cleanup.
+        client.close();
+    }
+}