You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ma...@apache.org on 2022/09/10 13:10:32 UTC

[pulsar] branch master updated: [fix][broker] Multiple consumer dispatcher stuck when `unackedMessages` greater than `maxUnackedMessages` (#17483)

This is an automated email from the ASF dual-hosted git repository.

mattisonchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d7943a5702a [fix][broker] Multiple consumer dispatcher stuck when `unackedMessages` greater than `maxUnackedMessages` (#17483)
d7943a5702a is described below

commit d7943a5702a48ae1fad6afc72d230f130b5b8150
Author: Qiang Zhao <ma...@apache.org>
AuthorDate: Sat Sep 10 21:10:21 2022 +0800

    [fix][broker] Multiple consumer dispatcher stuck when `unackedMessages` greater than `maxUnackedMessages` (#17483)
---
 .../PersistentDispatcherMultipleConsumers.java     |   4 +-
 ...istentStickyKeyDispatcherMultipleConsumers.java |   6 +-
 .../client/impl/KeySharedSubscriptionTest.java     | 189 +++++++++++++++++++++
 3 files changed, 196 insertions(+), 3 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index fd527fd5611..02d2e725379 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -634,7 +634,9 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
             // round-robin dispatch batch size for this consumer
             int availablePermits = c.isWritable() ? c.getAvailablePermits() : 1;
             if (c.getMaxUnackedMessages() > 0) {
-                availablePermits = Math.min(availablePermits, c.getMaxUnackedMessages() - c.getUnackedMessages());
+                // Avoid negative number
+                int remainUnAckedMessages = Math.max(c.getMaxUnackedMessages() - c.getUnackedMessages(), 0);
+                availablePermits = Math.min(availablePermits, remainUnAckedMessages);
             }
             if (log.isDebugEnabled() && !c.isWritable()) {
                 log.debug("[{}-{}] consumer is not writable. dispatching only 1 message to {}; "
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 5573596a96e..024ed8581ef 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -237,8 +237,10 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
             int entriesWithSameKeyCount = entriesWithSameKey.size();
             int availablePermits = consumer == null ? 0 : Math.max(consumer.getAvailablePermits(), 0);
             if (consumer != null && consumer.getMaxUnackedMessages() > 0) {
-                availablePermits = Math.min(availablePermits,
-                        consumer.getMaxUnackedMessages() - consumer.getUnackedMessages());
+                int remainUnAckedMessages =
+                        // Avoid negative number
+                        Math.max(consumer.getMaxUnackedMessages() - consumer.getUnackedMessages(), 0);
+                availablePermits = Math.min(availablePermits, remainUnAckedMessages);
             }
             int maxMessagesForC = Math.min(entriesWithSameKeyCount, availablePermits);
             int messagesForC = getRestrictedMaxEntriesForConsumer(consumer, entriesWithSameKey, maxMessagesForC,
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeySharedSubscriptionTest.java
new file mode 100644
index 00000000000..36d3bf6c440
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeySharedSubscriptionTest.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import lombok.Cleanup;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.awaitility.Awaitility;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+@Test(groups = "broker-impl")
+public class KeySharedSubscriptionTest extends ProducerConsumerBase {
+
+    @Override
+    @BeforeMethod
+    protected void setup() throws Exception {
+        conf.setMaxUnackedMessagesPerConsumer(10);
+        super.internalSetup();
+        super.producerBaseSetup();
+    }
+
+    @Override
+    @AfterMethod(alwaysRun = true)
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @DataProvider
+    public Object[][] subType() {
+        return new Object[][] { { SubscriptionType.Shared }, { SubscriptionType.Key_Shared } };
+    }
+
+    @Test(dataProvider = "subType")
+    public void testCanRecoverConsumptionWhenLiftMaxUnAckedMessagesRestriction(SubscriptionType subscriptionType)
+            throws PulsarClientException {
+        PulsarClient pulsarClient = PulsarClient.builder().
+                serviceUrl(lookupUrl.toString())
+                .build();
+        final int totalMsg = 1000;
+        String topic = "broker-close-test-" + RandomStringUtils.randomAlphabetic(5);
+        Map<Consumer<?>, List<MessageId>> nameToId = Maps.newConcurrentMap();
+        Set<MessageId> pubMessages = Sets.newConcurrentHashSet();
+        Set<MessageId> recMessages = Sets.newConcurrentHashSet();
+        AtomicLong lastActiveTime = new AtomicLong();
+        AtomicBoolean canAcknowledgement = new AtomicBoolean(false);
+
+        @Cleanup
+        Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionName("sub-1")
+                .subscriptionType(subscriptionType)
+                .consumerName("con-1")
+                .messageListener((cons1, msg) -> {
+                    lastActiveTime.set(System.currentTimeMillis());
+                    nameToId.computeIfAbsent(cons1,(k) -> new ArrayList<>())
+                            .add(msg.getMessageId());
+                    recMessages.add(msg.getMessageId());
+                    if (canAcknowledgement.get()) {
+                        try {
+                            cons1.acknowledge(msg);
+                        } catch (PulsarClientException e) {
+                            throw new RuntimeException(e);
+                        }
+                    }
+                })
+                .subscribe();
+        @Cleanup
+        Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionName("sub-1")
+                .subscriptionType(subscriptionType)
+                .messageListener((cons2, msg) -> {
+                    lastActiveTime.set(System.currentTimeMillis());
+                    nameToId.computeIfAbsent(cons2,(k) -> new ArrayList<>())
+                            .add(msg.getMessageId());
+                    recMessages.add(msg.getMessageId());
+                    if (canAcknowledgement.get()) {
+                        try {
+                            cons2.acknowledge(msg);
+                        } catch (PulsarClientException e) {
+                            throw new RuntimeException(e);
+                        }
+                    }
+                })
+                .consumerName("con-2")
+                .subscribe();
+        @Cleanup
+        Consumer<byte[]> consumer3 = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionName("sub-1")
+                .subscriptionType(subscriptionType)
+                .messageListener((cons3, msg) -> {
+                    lastActiveTime.set(System.currentTimeMillis());
+                    nameToId.computeIfAbsent(cons3,(k) -> new ArrayList<>())
+                            .add(msg.getMessageId());
+                    recMessages.add(msg.getMessageId());
+                    if (canAcknowledgement.get()) {
+                        try {
+                            cons3.acknowledge(msg);
+                        } catch (PulsarClientException e) {
+                            throw new RuntimeException(e);
+                        }
+                    }
+                })
+                .consumerName("con-3")
+                .subscribe();
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .enableBatching(true)
+                .batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS)
+                // We chose 9 because the maximum unacked message is 10
+                .batchingMaxMessages(9)
+                .create();
+
+        for (int i = 0; i < totalMsg; i++) {
+            producer.sendAsync(UUID.randomUUID().toString()
+                            .getBytes(StandardCharsets.UTF_8))
+                    .thenAccept(pubMessages::add);
+        }
+
+        // Wait for all consumers can not read more messages. the consumers are stuck by max unacked messages.
+        Awaitility.await()
+                .pollDelay(5, TimeUnit.SECONDS)
+                .until(() ->
+                        (System.currentTimeMillis() - lastActiveTime.get()) > TimeUnit.SECONDS.toMillis(5));
+
+        // All consumers can acknowledge messages as they continue to receive messages.
+        canAcknowledgement.set(true);
+
+        // Acknowledgment of currently received messages to get out of stuck state due to unack message
+        for (Map.Entry<Consumer<?>, List<MessageId>> entry : nameToId.entrySet()) {
+            Consumer<?> consumer = entry.getKey();
+            consumer.acknowledge(entry.getValue());
+        }
+        // refresh active time
+        lastActiveTime.set(System.currentTimeMillis());
+
+        // Wait for all consumers to continue receiving messages.
+        Awaitility.await()
+                .pollDelay(5, TimeUnit.SECONDS)
+                .until(() ->
+                        (System.currentTimeMillis() - lastActiveTime.get()) > TimeUnit.SECONDS.toMillis(5));
+
+        //Determine if all messages have been received.
+        //If the dispatcher is stuck, we can not receive enough messages.
+        Assert.assertEquals(pubMessages.size(), totalMsg);
+        Assert.assertEquals(pubMessages.size(), recMessages.size());
+        Assert.assertTrue(recMessages.containsAll(pubMessages));
+    }
+}