You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2021/05/13 07:46:37 UTC
[pulsar] 43/46: [pulsar-broker] Dispatch messaages to consumer with
permits (#10417)
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch 2.7.2_ds_tmp
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit a26de54f1bfe68fdd3d2084dc4505fceb768f96c
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Tue May 11 15:31:50 2021 -0700
[pulsar-broker] Dispatch messaages to consumer with permits (#10417)
* [pulsar-broker] Dispatch messaages to consumer with permits
* move test
(cherry picked from commit 3550f2e7c1bf41ff548d2cf5d16ae50ebf4c0556)
---
.../PersistentDispatcherMultipleConsumers.java | 25 ++++++---
.../pulsar/client/api/ConsumerRedeliveryTest.java | 59 ++++++++++++++++++++++
.../apache/pulsar/client/impl/ConsumerBase.java | 11 ++++
3 files changed, 89 insertions(+), 6 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index f3c6d94..bba7c98 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -249,8 +249,9 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
public synchronized void readMoreEntries() {
// totalAvailablePermits may be updated by other threads
- int currentTotalAvailablePermits = totalAvailablePermits;
- if (currentTotalAvailablePermits > 0 && isAtleastOneConsumerAvailable()) {
+ int firstAvailableConsumerPermits = getFirstAvailableConsumerPermits();
+ int currentTotalAvailablePermits = Math.max(totalAvailablePermits, firstAvailableConsumerPermits);
+ if (currentTotalAvailablePermits > 0 && firstAvailableConsumerPermits > 0) {
int messagesToRead = calculateNumOfMessageToRead(currentTotalAvailablePermits);
if (-1 == messagesToRead) {
@@ -510,7 +511,15 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
long totalMessagesSent = 0;
long totalBytesSent = 0;
- while (entriesToDispatch > 0 && totalAvailablePermits > 0 && isAtleastOneConsumerAvailable()) {
+ int firstAvailableConsumerPermits, currentTotalAvailablePermits;
+ boolean dispatchMessage;
+ while (entriesToDispatch > 0) {
+ firstAvailableConsumerPermits = getFirstAvailableConsumerPermits();
+ currentTotalAvailablePermits = Math.max(totalAvailablePermits, firstAvailableConsumerPermits);
+ dispatchMessage = currentTotalAvailablePermits > 0 && firstAvailableConsumerPermits > 0;
+ if (!dispatchMessage) {
+ break;
+ }
Consumer c = getNextConsumer();
if (c == null) {
// Do nothing, cursor will be rewind at reconnection
@@ -668,16 +677,20 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
* @return
*/
protected boolean isAtleastOneConsumerAvailable() {
+ return getFirstAvailableConsumerPermits() > 0;
+ }
+
+ protected int getFirstAvailableConsumerPermits() {
if (consumerList.isEmpty() || IS_CLOSED_UPDATER.get(this) == TRUE) {
// abort read if no consumers are connected or if disconnect is initiated
- return false;
+ return 0;
}
for(Consumer consumer : consumerList) {
if (isConsumerAvailable(consumer)) {
- return true;
+ return consumer.getAvailablePermits();
}
}
- return false;
+ return 0;
}
private boolean isConsumerWritable() {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java
index e828598..95cf90a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.client.api;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@@ -29,6 +30,8 @@ import lombok.Cleanup;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -40,6 +43,9 @@ import static org.testng.Assert.assertTrue;
import static org.testng.Assert.assertEquals;
public class ConsumerRedeliveryTest extends ProducerConsumerBase {
+
+ private static final Logger log = LoggerFactory.getLogger(ConsumerRedeliveryTest.class);
+
@BeforeClass
@Override
protected void setup() throws Exception {
@@ -179,4 +185,57 @@ public class ConsumerRedeliveryTest extends ProducerConsumerBase {
consumer.close();
}
+ /**
+ * Validates broker should dispatch messages to consumer which still has the permit to consume more messages.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testConsumerWithPermitReceiveBatchMessages() throws Exception {
+
+ log.info("-- Starting {} test --", methodName);
+
+ final int queueSize = 10;
+ int batchSize = 100;
+ String subName = "my-subscriber-name";
+ String topicName = "permitReceiveBatchMessages"+(UUID.randomUUID().toString());
+ ConsumerImpl<byte[]> consumer1 = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topicName)
+ .receiverQueueSize(queueSize).subscriptionType(SubscriptionType.Shared).subscriptionName(subName)
+ .subscribe();
+
+ ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer().topic(topicName);
+
+ producerBuilder.enableBatching(true);
+ producerBuilder.batchingMaxPublishDelay(2000, TimeUnit.MILLISECONDS);
+ producerBuilder.batchingMaxMessages(100);
+
+ Producer<byte[]> producer = producerBuilder.create();
+ for (int i = 0; i < batchSize; i++) {
+ String message = "my-message-" + i;
+ producer.sendAsync(message.getBytes());
+ }
+ producer.flush();
+
+ for (int i = 0; i < queueSize; i++) {
+ String message = "my-message-" + i;
+ producer.sendAsync(message.getBytes());
+ }
+ producer.flush();
+
+ retryStrategically((test) -> {
+ return consumer1.getTotalIncomingMessages() == batchSize;
+ }, 5, 2000);
+
+ assertEquals(consumer1.getTotalIncomingMessages(), batchSize);
+
+ ConsumerImpl<byte[]> consumer2 = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topicName)
+ .receiverQueueSize(queueSize).subscriptionType(SubscriptionType.Shared).subscriptionName(subName)
+ .subscribe();
+
+ retryStrategically((test) -> {
+ return consumer2.getTotalIncomingMessages() == queueSize;
+ }, 5, 2000);
+ assertEquals(consumer2.getTotalIncomingMessages(), queueSize);
+ log.info("-- Exiting {} test --", methodName);
+ }
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 30c3d01..b296a21 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -925,6 +925,17 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
return INCOMING_MESSAGES_SIZE_UPDATER.get(this);
}
+ public int getTotalIncomingMessages() {
+ return incomingMessages.size();
+ }
+
+ protected void clearIncomingMessages() {
+ // release messages if they are pooled messages
+ incomingMessages.forEach(Message::release);
+ incomingMessages.clear();
+ resetIncomingMessageSize();
+ }
+
protected abstract void completeOpBatchReceive(OpBatchReceive<T> op);
private static final Logger log = LoggerFactory.getLogger(ConsumerBase.class);