You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/09/12 01:35:55 UTC
[pulsar] branch branch-2.10 updated: [fix][broker] Multiple consumer dispatcher stuck when `unackedMessages` greater than `maxUnackedMessages` (#17483)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new 011e1217846 [fix][broker] Multiple consumer dispatcher stuck when `unackedMessages` greater than `maxUnackedMessages` (#17483)
011e1217846 is described below
commit 011e121784614199889c999b3e1a4a717af07938
Author: Qiang Zhao <ma...@apache.org>
AuthorDate: Sat Sep 10 21:10:21 2022 +0800
[fix][broker] Multiple consumer dispatcher stuck when `unackedMessages` greater than `maxUnackedMessages` (#17483)
(cherry picked from commit d7943a5702a48ae1fad6afc72d230f130b5b8150)
---
.../PersistentDispatcherMultipleConsumers.java | 4 +-
...istentStickyKeyDispatcherMultipleConsumers.java | 6 +-
.../client/impl/KeySharedSubscriptionTest.java | 189 +++++++++++++++++++++
3 files changed, 196 insertions(+), 3 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index bfff0c67c73..15c8654fe36 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -558,7 +558,9 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
// round-robin dispatch batch size for this consumer
int availablePermits = c.isWritable() ? c.getAvailablePermits() : 1;
if (c.getMaxUnackedMessages() > 0) {
- availablePermits = Math.min(availablePermits, c.getMaxUnackedMessages() - c.getUnackedMessages());
+ // Avoid negative number
+ int remainUnAckedMessages = Math.max(c.getMaxUnackedMessages() - c.getUnackedMessages(), 0);
+ availablePermits = Math.min(availablePermits, remainUnAckedMessages);
}
if (log.isDebugEnabled() && !c.isWritable()) {
log.debug("[{}-{}] consumer is not writable. dispatching only 1 message to {}; "
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 60762d8400a..90db639fde3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -238,8 +238,10 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
int entriesWithSameKeyCount = entriesWithSameKey.size();
int availablePermits = consumer == null ? 0 : Math.max(consumer.getAvailablePermits(), 0);
if (consumer != null && consumer.getMaxUnackedMessages() > 0) {
- availablePermits = Math.min(availablePermits,
- consumer.getMaxUnackedMessages() - consumer.getUnackedMessages());
+ int remainUnAckedMessages =
+ // Avoid negative number
+ Math.max(consumer.getMaxUnackedMessages() - consumer.getUnackedMessages(), 0);
+ availablePermits = Math.min(availablePermits, remainUnAckedMessages);
}
int maxMessagesForC = Math.min(entriesWithSameKeyCount, availablePermits);
int messagesForC = getRestrictedMaxEntriesForConsumer(consumer, entriesWithSameKey, maxMessagesForC,
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeySharedSubscriptionTest.java
new file mode 100644
index 00000000000..36d3bf6c440
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeySharedSubscriptionTest.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import lombok.Cleanup;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.awaitility.Awaitility;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+@Test(groups = "broker-impl")
+public class KeySharedSubscriptionTest extends ProducerConsumerBase {
+
+ @Override
+ @BeforeMethod
+ protected void setup() throws Exception {
+ conf.setMaxUnackedMessagesPerConsumer(10);
+ super.internalSetup();
+ super.producerBaseSetup();
+ }
+
+ @Override
+ @AfterMethod(alwaysRun = true)
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @DataProvider
+ public Object[][] subType() {
+ return new Object[][] { { SubscriptionType.Shared }, { SubscriptionType.Key_Shared } };
+ }
+
+ @Test(dataProvider = "subType")
+ public void testCanRecoverConsumptionWhenLiftMaxUnAckedMessagesRestriction(SubscriptionType subscriptionType)
+ throws PulsarClientException {
+ PulsarClient pulsarClient = PulsarClient.builder().
+ serviceUrl(lookupUrl.toString())
+ .build();
+ final int totalMsg = 1000;
+ String topic = "broker-close-test-" + RandomStringUtils.randomAlphabetic(5);
+ Map<Consumer<?>, List<MessageId>> nameToId = Maps.newConcurrentMap();
+ Set<MessageId> pubMessages = Sets.newConcurrentHashSet();
+ Set<MessageId> recMessages = Sets.newConcurrentHashSet();
+ AtomicLong lastActiveTime = new AtomicLong();
+ AtomicBoolean canAcknowledgement = new AtomicBoolean(false);
+
+ @Cleanup
+ Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
+ .topic(topic)
+ .subscriptionName("sub-1")
+ .subscriptionType(subscriptionType)
+ .consumerName("con-1")
+ .messageListener((cons1, msg) -> {
+ lastActiveTime.set(System.currentTimeMillis());
+ nameToId.computeIfAbsent(cons1,(k) -> new ArrayList<>())
+ .add(msg.getMessageId());
+ recMessages.add(msg.getMessageId());
+ if (canAcknowledgement.get()) {
+ try {
+ cons1.acknowledge(msg);
+ } catch (PulsarClientException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ })
+ .subscribe();
+ @Cleanup
+ Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
+ .topic(topic)
+ .subscriptionName("sub-1")
+ .subscriptionType(subscriptionType)
+ .messageListener((cons2, msg) -> {
+ lastActiveTime.set(System.currentTimeMillis());
+ nameToId.computeIfAbsent(cons2,(k) -> new ArrayList<>())
+ .add(msg.getMessageId());
+ recMessages.add(msg.getMessageId());
+ if (canAcknowledgement.get()) {
+ try {
+ cons2.acknowledge(msg);
+ } catch (PulsarClientException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ })
+ .consumerName("con-2")
+ .subscribe();
+ @Cleanup
+ Consumer<byte[]> consumer3 = pulsarClient.newConsumer()
+ .topic(topic)
+ .subscriptionName("sub-1")
+ .subscriptionType(subscriptionType)
+ .messageListener((cons3, msg) -> {
+ lastActiveTime.set(System.currentTimeMillis());
+ nameToId.computeIfAbsent(cons3,(k) -> new ArrayList<>())
+ .add(msg.getMessageId());
+ recMessages.add(msg.getMessageId());
+ if (canAcknowledgement.get()) {
+ try {
+ cons3.acknowledge(msg);
+ } catch (PulsarClientException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ })
+ .consumerName("con-3")
+ .subscribe();
+
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topic)
+ .enableBatching(true)
+ .batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS)
+ // We chose 9 because the maximum unacked message is 10
+ .batchingMaxMessages(9)
+ .create();
+
+ for (int i = 0; i < totalMsg; i++) {
+ producer.sendAsync(UUID.randomUUID().toString()
+ .getBytes(StandardCharsets.UTF_8))
+ .thenAccept(pubMessages::add);
+ }
+
+ // Wait for all consumers can not read more messages. the consumers are stuck by max unacked messages.
+ Awaitility.await()
+ .pollDelay(5, TimeUnit.SECONDS)
+ .until(() ->
+ (System.currentTimeMillis() - lastActiveTime.get()) > TimeUnit.SECONDS.toMillis(5));
+
+ // All consumers can acknowledge messages as they continue to receive messages.
+ canAcknowledgement.set(true);
+
+ // Acknowledgment of currently received messages to get out of stuck state due to unack message
+ for (Map.Entry<Consumer<?>, List<MessageId>> entry : nameToId.entrySet()) {
+ Consumer<?> consumer = entry.getKey();
+ consumer.acknowledge(entry.getValue());
+ }
+ // refresh active time
+ lastActiveTime.set(System.currentTimeMillis());
+
+ // Wait for all consumers to continue receiving messages.
+ Awaitility.await()
+ .pollDelay(5, TimeUnit.SECONDS)
+ .until(() ->
+ (System.currentTimeMillis() - lastActiveTime.get()) > TimeUnit.SECONDS.toMillis(5));
+
+ //Determine if all messages have been received.
+ //If the dispatcher is stuck, we can not receive enough messages.
+ Assert.assertEquals(pubMessages.size(), totalMsg);
+ Assert.assertEquals(pubMessages.size(), recMessages.size());
+ Assert.assertTrue(recMessages.containsAll(pubMessages));
+ }
+}