You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2021/05/13 07:46:37 UTC

[pulsar] 43/46: [pulsar-broker] Dispatch messaages to consumer with permits (#10417)

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch 2.7.2_ds_tmp
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit a26de54f1bfe68fdd3d2084dc4505fceb768f96c
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Tue May 11 15:31:50 2021 -0700

    [pulsar-broker] Dispatch messaages to consumer with permits (#10417)
    
    * [pulsar-broker] Dispatch messaages to consumer with permits
    
    * move test
    
    (cherry picked from commit 3550f2e7c1bf41ff548d2cf5d16ae50ebf4c0556)
---
 .../PersistentDispatcherMultipleConsumers.java     | 25 ++++++---
 .../pulsar/client/api/ConsumerRedeliveryTest.java  | 59 ++++++++++++++++++++++
 .../apache/pulsar/client/impl/ConsumerBase.java    | 11 ++++
 3 files changed, 89 insertions(+), 6 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index f3c6d94..bba7c98 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -249,8 +249,9 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
 
     public synchronized void readMoreEntries() {
         // totalAvailablePermits may be updated by other threads
-        int currentTotalAvailablePermits = totalAvailablePermits;
-        if (currentTotalAvailablePermits > 0 && isAtleastOneConsumerAvailable()) {
+        int firstAvailableConsumerPermits = getFirstAvailableConsumerPermits();
+        int currentTotalAvailablePermits = Math.max(totalAvailablePermits, firstAvailableConsumerPermits);
+        if (currentTotalAvailablePermits > 0 && firstAvailableConsumerPermits > 0) {
             int messagesToRead = calculateNumOfMessageToRead(currentTotalAvailablePermits);
 
             if (-1 == messagesToRead) {
@@ -510,7 +511,15 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
         long totalMessagesSent = 0;
         long totalBytesSent = 0;
 
-        while (entriesToDispatch > 0 && totalAvailablePermits > 0 && isAtleastOneConsumerAvailable()) {
+        int firstAvailableConsumerPermits, currentTotalAvailablePermits;
+        boolean dispatchMessage;
+        while (entriesToDispatch > 0) {
+            firstAvailableConsumerPermits = getFirstAvailableConsumerPermits();
+            currentTotalAvailablePermits = Math.max(totalAvailablePermits, firstAvailableConsumerPermits);
+            dispatchMessage = currentTotalAvailablePermits > 0 && firstAvailableConsumerPermits > 0;
+            if (!dispatchMessage) {
+                break;
+            }
             Consumer c = getNextConsumer();
             if (c == null) {
                 // Do nothing, cursor will be rewind at reconnection
@@ -668,16 +677,20 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
      * @return
      */
     protected boolean isAtleastOneConsumerAvailable() {
+        return getFirstAvailableConsumerPermits() > 0;
+    }
+
+    protected int getFirstAvailableConsumerPermits() {
         if (consumerList.isEmpty() || IS_CLOSED_UPDATER.get(this) == TRUE) {
             // abort read if no consumers are connected or if disconnect is initiated
-            return false;
+            return 0;
         }
         for(Consumer consumer : consumerList) {
             if (isConsumerAvailable(consumer)) {
-                return true;
+                return consumer.getAvailablePermits();
             }
         }
-        return false;
+        return 0;
     }
 
     private boolean isConsumerWritable() {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java
index e828598..95cf90a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.client.api;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -29,6 +30,8 @@ import lombok.Cleanup;
 
 import org.apache.pulsar.client.impl.ConsumerImpl;
 import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -40,6 +43,9 @@ import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.assertEquals;
 
 public class ConsumerRedeliveryTest extends ProducerConsumerBase {
+
+    private static final Logger log = LoggerFactory.getLogger(ConsumerRedeliveryTest.class);
+
     @BeforeClass
     @Override
     protected void setup() throws Exception {
@@ -179,4 +185,57 @@ public class ConsumerRedeliveryTest extends ProducerConsumerBase {
         consumer.close();
     }
 
+    /**
+     * Validates broker should dispatch messages to consumer which still has the permit to consume more messages.
+     * 
+     * @throws Exception
+     */
+    @Test
+    public void testConsumerWithPermitReceiveBatchMessages() throws Exception {
+
+        log.info("-- Starting {} test --", methodName);
+
+        final int queueSize = 10;
+        int batchSize = 100;
+        String subName = "my-subscriber-name";
+        String topicName = "permitReceiveBatchMessages"+(UUID.randomUUID().toString());
+        ConsumerImpl<byte[]> consumer1 = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topicName)
+                .receiverQueueSize(queueSize).subscriptionType(SubscriptionType.Shared).subscriptionName(subName)
+                .subscribe();
+
+        ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer().topic(topicName);
+
+        producerBuilder.enableBatching(true);
+        producerBuilder.batchingMaxPublishDelay(2000, TimeUnit.MILLISECONDS);
+        producerBuilder.batchingMaxMessages(100);
+
+        Producer<byte[]> producer = producerBuilder.create();
+        for (int i = 0; i < batchSize; i++) {
+            String message = "my-message-" + i;
+            producer.sendAsync(message.getBytes());
+        }
+        producer.flush();
+
+        for (int i = 0; i < queueSize; i++) {
+            String message = "my-message-" + i;
+            producer.sendAsync(message.getBytes());
+        }
+        producer.flush();
+
+        retryStrategically((test) -> {
+            return consumer1.getTotalIncomingMessages() == batchSize;
+        }, 5, 2000);
+
+        assertEquals(consumer1.getTotalIncomingMessages(), batchSize);
+
+        ConsumerImpl<byte[]> consumer2 = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topicName)
+                .receiverQueueSize(queueSize).subscriptionType(SubscriptionType.Shared).subscriptionName(subName)
+                .subscribe();
+
+        retryStrategically((test) -> {
+            return consumer2.getTotalIncomingMessages() == queueSize;
+        }, 5, 2000);
+        assertEquals(consumer2.getTotalIncomingMessages(), queueSize);
+        log.info("-- Exiting {} test --", methodName);
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 30c3d01..b296a21 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -925,6 +925,17 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
         return INCOMING_MESSAGES_SIZE_UPDATER.get(this);
     }
 
+    public int getTotalIncomingMessages() {
+        return incomingMessages.size();
+    }
+
+    protected void clearIncomingMessages() {
+        // release messages if they are pooled messages
+        incomingMessages.forEach(Message::release);
+        incomingMessages.clear();
+        resetIncomingMessageSize();
+    }
+
     protected abstract void completeOpBatchReceive(OpBatchReceive<T> op);
 
     private static final Logger log = LoggerFactory.getLogger(ConsumerBase.class);