You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/08/06 08:07:00 UTC

[pulsar] branch master updated: Fix Consumer listener does not respect receiver queue size (#11455)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 09944d9  Fix Consumer listener does not respect receiver queue size (#11455)
09944d9 is described below

commit 09944d9891e0a433da9c7a1cf7b80acf5b3c1116
Author: GuoJiwei <te...@apache.org>
AuthorDate: Fri Aug 6 16:06:18 2021 +0800

    Fix Consumer listener does not respect receiver queue size (#11455)
    
    Fixes  #11008
---
 .../client/api/KeySharedSubscriptionTest.java      |  6 +-
 .../client/api/SimpleProducerConsumerTest.java     | 93 ++++++++++++++++++----
 .../apache/pulsar/client/impl/ConsumerBase.java    | 31 +++++---
 3 files changed, 101 insertions(+), 29 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
index 4142edb..967094a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
@@ -525,9 +525,9 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
 
             // Since only 1 out of 10 consumers is stuck, we should be able to receive ~90% messages,
             // plus or minus for some skew in the key distribution.
-            Thread.sleep(5000);
-
-            assertEquals((double) receivedMessages.get(), N * 0.9, N * 0.3);
+            Awaitility.await().untilAsserted(() -> {
+                assertEquals((double) receivedMessages.get(), N * 0.9, N * 0.3);
+            });
         } finally {
             for (PulsarClient c : clients) {
                 c.close();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 77292f8..2eebbf8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -4125,11 +4125,14 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
      * <p>
      * Test starts consumer with 10 partitions where one of the partition listener gets blocked but that will not impact
      * processing of other 9 partitions and they will be processed successfully.
+     * As of involved #11455(Fix Consumer listener does not respect receiver queue size),
+     * This test has changed the purpose that different thread run the messageListener. Because messageListener has to
+     * be called one by one, it's possible to run by the same one thread.
      *
      * @throws Exception
      */
     @Test(timeOut = 20000)
-    public void testPartitionTopicsOnSeparateListner() throws Exception {
+    public void testPartitionTopicsOnSeparateListener() throws Exception {
         log.info("-- Starting {} test --", methodName);
 
         final String topicName = "persistent://my-property/my-ns/one-partitioned-topic";
@@ -4145,20 +4148,12 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
 
         // each partition
         int totalMessages = partitions * 2;
-        CountDownLatch latch = new CountDownLatch(totalMessages - 2);
-        CountDownLatch blockedMessageLatch = new CountDownLatch(1);
+        CountDownLatch latch = new CountDownLatch(1);
         AtomicInteger count = new AtomicInteger();
 
         Set<String> listenerThreads = Sets.newConcurrentHashSet();
         MessageListener<byte[]> messageListener = (c, m) -> {
-            if (count.incrementAndGet() == 1) {
-                try {
-                    // blocking one of the partition's listener thread will not impact other topics
-                    blockedMessageLatch.await();
-                } catch (InterruptedException e) {
-                    // Ok
-                }
-            } else {
+            if (count.incrementAndGet() == totalMessages) {
                 latch.countDown();
             }
             listenerThreads.add(Thread.currentThread().getName());
@@ -4177,9 +4172,79 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
             producer1.newMessage().value(("one-partitioned-topic-value-producer1-" + i).getBytes(UTF_8)).send();
         }
         latch.await();
-        assertEquals(listenerThreads.size(), partitions - 1);
-        // unblock the listener thread
-        blockedMessageLatch.countDown();
+        assertTrue(listenerThreads.size() >= 1);
         log.info("-- Exiting {} test --", methodName);
     }
+
+    @Test(timeOut = 30000)
+    public void testShareConsumerWithMessageListener() throws Exception {
+        String topic = "testReadAheadWhenAddingConsumers-" + UUID.randomUUID();
+        int total = 200;
+        Set<Integer> resultSet = Sets.newConcurrentHashSet();
+        AtomicInteger r1 = new AtomicInteger(0);
+        AtomicInteger r2 = new AtomicInteger(0);
+
+        @Cleanup
+        Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
+                .topic(topic)
+                .maxPendingMessages(500)
+                .enableBatching(false)
+                .create();
+
+        @Cleanup
+        Consumer<Integer> c1 = pulsarClient.newConsumer(Schema.INT32)
+                .topic(topic)
+                .subscriptionName("shared")
+                .subscriptionType(SubscriptionType.Shared)
+                .receiverQueueSize(10)
+                .consumerName("c1")
+                .messageListener((MessageListener<Integer>) (consumer, msg) -> {
+                    log.info("c1 received : {}", msg.getValue());
+                    try {
+                        resultSet.add(msg.getValue());
+                        r1.incrementAndGet();
+                        consumer.acknowledge(msg);
+                        Thread.sleep(10);
+                    } catch (InterruptedException ignore) {
+                        //
+                    } catch (PulsarClientException ex) {
+                        log.error("c1 acknowledge error", ex);
+                    }
+                })
+                .subscribe();
+
+        for (int i = 0; i < total; i++) {
+            producer.newMessage()
+                    .value(i)
+                    .send();
+        }
+
+        @Cleanup
+        Consumer<Integer> c2 = pulsarClient.newConsumer(Schema.INT32)
+                .topic(topic)
+                .subscriptionName("shared")
+                .subscriptionType(SubscriptionType.Shared)
+                .receiverQueueSize(10)
+                .consumerName("c2")
+                .messageListener((MessageListener<Integer>) (consumer, msg) -> {
+                    log.info("c2 received : {}", msg.getValue());
+                    try {
+                        resultSet.add(msg.getValue());
+                        r2.incrementAndGet();
+                        consumer.acknowledge(msg);
+                        Thread.sleep(10);
+                    } catch (InterruptedException ignore) {
+                        //
+                    } catch (PulsarClientException ex) {
+                        log.error("c2 acknowledge error", ex);
+                    }
+                })
+                .subscribe();
+
+        Awaitility.await().untilAsserted(() -> {
+            assertTrue(r1.get() >= 1);
+            assertTrue(r2.get() >= 1);
+            assertEquals(resultSet.size(), total);
+        });
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 5c02904..9e3757b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -35,6 +35,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 
 import io.netty.util.Timeout;
@@ -86,6 +87,7 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
     protected volatile long incomingMessagesSize = 0;
     protected volatile Timeout batchReceiveTimeout = null;
     protected final Lock reentrantLock = new ReentrantLock();
+    private final AtomicInteger executorQueueSize = new AtomicInteger(0);
 
     protected ConsumerBase(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
                            int receiverQueueSize, ExecutorProvider executorProvider,
@@ -895,24 +897,26 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
     protected void triggerListener() {
         // Trigger the notification on the message listener in a separate thread to avoid blocking the networking
         // thread while the message processing happens
-        Message<T> msg;
-        do {
-            try {
-                msg = internalReceive(0, TimeUnit.MILLISECONDS);
+        try {
+            // Control executor to call MessageListener one by one.
+            if (executorQueueSize.get() < 1) {
+                final Message<T> msg = internalReceive(0, TimeUnit.MILLISECONDS);
                 if (msg != null) {
-                    final Message<T> finalMsg = msg;
+                    executorQueueSize.incrementAndGet();
                     if (SubscriptionType.Key_Shared == conf.getSubscriptionType()) {
-                        executorProvider.getExecutor(peekMessageKey(finalMsg)).execute(() ->
-                                callMessageListener(finalMsg));
+                        executorProvider.getExecutor(peekMessageKey(msg)).execute(() ->
+                                callMessageListener(msg));
                     } else {
-                        getExecutor(msg).execute(() -> callMessageListener(finalMsg));
+                        getExecutor(msg).execute(() -> {
+                            callMessageListener(msg);
+                        });
                     }
                 }
-            } catch (PulsarClientException e) {
-                log.warn("[{}] [{}] Failed to dequeue the message for listener", topic, subscription, e);
-                return;
             }
-        } while (msg != null);
+        } catch (PulsarClientException e) {
+            log.warn("[{}] [{}] Failed to dequeue the message for listener", topic, subscription, e);
+            return;
+        }
 
         if (log.isDebugEnabled()) {
             log.debug("[{}] [{}] Message has been cleared from the queue", topic, subscription);
@@ -929,6 +933,9 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
         } catch (Throwable t) {
             log.error("[{}][{}] Message listener error in processing message: {}", topic, subscription,
                     msg.getMessageId(), t);
+        } finally {
+            executorQueueSize.decrementAndGet();
+            triggerListener();
         }
     }