You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/06/05 00:11:05 UTC

[pulsar] branch master updated: Fixed KeyShared consumers getting stuck on delivery (#7105)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 2f75d02  Fixed KeyShared consumers getting stuck on delivery (#7105)
2f75d02 is described below

commit 2f75d0263bf708574a3cef5aa0e47891e1d6a3f3
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu Jun 4 17:10:52 2020 -0700

    Fixed KeyShared consumers getting stuck on delivery (#7105)
    
    Motivation
    If one consumer is slowly processing messages, this can prevent other consumers from making progress on the topic. Instead we're in a loop of keep trying to replay messages without being able to dispatch any message.
    
    The basic idea here is that we can make progress by keep going through the topic and dispatch these messages to the consumers that are free, at least the keys that belong to them.
---
 .../PersistentDispatcherMultipleConsumers.java     |  2 +-
 ...istentStickyKeyDispatcherMultipleConsumers.java | 29 +++++++++
 .../client/api/KeySharedSubscriptionTest.java      | 71 ++++++++++++++++++++++
 3 files changed, 101 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index c7af338..b8255ba 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -784,7 +784,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
         }
     }
 
-    private synchronized Set<PositionImpl> getMessagesToReplayNow(int maxMessagesToRead) {
+    protected synchronized Set<PositionImpl> getMessagesToReplayNow(int maxMessagesToRead) {
         if (!messagesToRedeliver.isEmpty()) {
             return messagesToRedeliver.items(maxMessagesToRead,
                     (ledgerId, entryId) -> new PositionImpl(ledgerId, entryId));
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index ce946fc..9e93ff8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service.persistent;
 import io.netty.util.concurrent.FastThreadLocal;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -30,6 +31,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.broker.service.EntryBatchIndexesAcks;
@@ -37,6 +39,7 @@ import org.apache.pulsar.broker.service.EntryBatchSizes;
 import org.apache.pulsar.broker.service.SendMessageInfo;
 import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
 import org.apache.pulsar.broker.service.Subscription;
+import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.ReadType;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,6 +48,8 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
 
     private final StickyKeyConsumerSelector selector;
 
+    private boolean isDispatcherStuckOnReplays = false;
+
     PersistentStickyKeyDispatcherMultipleConsumers(PersistentTopic topic, ManagedCursor cursor,
            Subscription subscription, StickyKeyConsumerSelector selector) {
         super(topic, cursor, subscription);
@@ -159,6 +164,30 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
                 dispatchRateLimiter.get().tryDispatchPermit(totalMessagesSent, totalBytesSent);
             }
         }
+
+        if (totalMessagesSent == 0) {
+            // This means, that all the messages we've just read cannot be dispatched right now.
+            // This condition can only happen when:
+            //  1. We have consumers ready to accept messages (otherwise the would not haven been triggered)
+            //  2. All keys in the current set of messages are routing to consumers that are currently busy
+            //
+            // The solution here is to move on and read next batch of messages which might hopefully contain
+            // also keys meant for other consumers.
+            isDispatcherStuckOnReplays = true;
+            readMoreEntries();
+        }
+    }
+
+    protected synchronized Set<PositionImpl> getMessagesToReplayNow(int maxMessagesToRead) {
+        if (isDispatcherStuckOnReplays) {
+            // If we're stuck on replay, we want to move forward reading on the topic (until the overall max-unacked
+            // messages kicks in), instead of keep replaying the same old messages, since the consumer that these
+            // messages are routing to might be busy at the moment
+            this.isDispatcherStuckOnReplays = false;
+            return Collections.emptySet();
+        } else {
+            return super.getMessagesToReplayNow(maxMessagesToRead);
+        }
     }
 
     @Override
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
index 84a1978..e0c4093 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
@@ -41,7 +41,10 @@ import java.util.Map;
 import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
@@ -392,6 +395,74 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
         }
     }
 
+    @Test(dataProvider = "batch")
+    public void testMakingProgressWithSlowerConsumer(boolean enableBatch) throws Exception {
+        this.conf.setSubscriptionKeySharedEnable(true);
+        String topic = "testMakingProgressWithSlowerConsumer-" + UUID.randomUUID();
+
+        String slowKey = "slowKey";
+
+        List<PulsarClient> clients = new ArrayList<>();
+
+        AtomicInteger receivedMessages = new AtomicInteger();
+
+        for (int i = 0; i < 10; i++) {
+            PulsarClient client = PulsarClient.builder()
+                    .serviceUrl(brokerUrl.toString())
+                    .build();
+            clients.add(client);
+
+            client.newConsumer(Schema.INT32)
+                    .topic(topic)
+                    .subscriptionName("key_shared")
+                    .subscriptionType(SubscriptionType.Key_Shared)
+                    .receiverQueueSize(1)
+                    .messageListener((consumer, msg) -> {
+                        try {
+                            if (slowKey.equals(msg.getKey())) {
+                                // Block the thread to simulate a slow consumer
+                                Thread.sleep(10000);
+                            }
+
+                            receivedMessages.incrementAndGet();
+                            consumer.acknowledge(msg);
+                        } catch (Exception e) {
+                            e.printStackTrace();
+                        }
+                    })
+                    .subscribe();
+        }
+
+        @Cleanup
+        Producer<Integer> producer = createProducer(topic, enableBatch);
+
+        // First send the "slow key" so that 1 consumer will get stuck
+        producer.newMessage()
+                .key(slowKey)
+                .value(-1)
+                .send();
+
+        int N = 1000;
+
+        // Then send all the other keys
+        for (int i = 0; i < N; i++) {
+            producer.newMessage()
+                    .key(String.valueOf(random.nextInt(NUMBER_OF_KEYS)))
+                    .value(i)
+                    .send();
+        }
+
+        // Since only 1 out of 10 consumers is stuck, we should be able to receive ~90% messages,
+        // plus or minus for some skew in the key distribution.
+        Thread.sleep(5000);
+
+        assertEquals((double) receivedMessages.get(), N * 0.9, N * 0.3);
+
+        for (PulsarClient c : clients) {
+            c.close();
+        }
+    }
+
     private Producer<Integer> createProducer(String topic, boolean enableBatch) throws PulsarClientException {
         Producer<Integer> producer = null;
         if (enableBatch) {