You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/04/18 00:52:26 UTC
[pulsar] branch master updated: [PIP-74] Support auto scaled consumer receiver queue (#14494)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new bb0e0f2b432 [PIP-74] Support auto scaled consumer receiver queue (#14494)
bb0e0f2b432 is described below
commit bb0e0f2b432bbcdd67a59d8c08cf768811c459ec
Author: JiangHaiting <ji...@apache.org>
AuthorDate: Mon Apr 18 08:52:19 2022 +0800
[PIP-74] Support auto scaled consumer receiver queue (#14494)
* Add autoScaledReceiverQueueSize
* Add autoScaledReceiverQueueSize for PerformanceConsumer
* remove memory limit code
* fix typo
---
.../impl/AutoScaledReceiverQueueSizeTest.java | 258 +++++++++++++++++++++
.../apache/pulsar/client/api/ConsumerBuilder.java | 14 ++
.../apache/pulsar/client/impl/ConsumerBase.java | 26 ++-
.../pulsar/client/impl/ConsumerBuilderImpl.java | 6 +
.../apache/pulsar/client/impl/ConsumerImpl.java | 30 ++-
.../client/impl/MultiTopicsConsumerImpl.java | 28 ++-
.../pulsar/client/impl/ZeroQueueConsumerImpl.java | 10 +
.../impl/conf/ConsumerConfigurationData.java | 2 +
.../pulsar/testclient/PerformanceConsumer.java | 38 ++-
9 files changed, 407 insertions(+), 5 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AutoScaledReceiverQueueSizeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AutoScaledReceiverQueueSizeTest.java
new file mode 100644
index 00000000000..2b9e2dc3adf
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AutoScaledReceiverQueueSizeTest.java
@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.client.impl;
+
+
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.BatchReceivePolicy;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.awaitility.Awaitility;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class AutoScaledReceiverQueueSizeTest extends MockedPulsarServiceBaseTest {
+ @BeforeClass(alwaysRun = true)
+ @Override
+ protected void setup() throws Exception {
+ super.internalSetup();
+ setupDefaultTenantAndNamespace();
+ }
+
+ @BeforeClass(alwaysRun = true)
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Test
+ public void testConsumerImpl() throws PulsarClientException {
+ String topic = "persistent://public/default/testConsumerImpl" + System.currentTimeMillis();
+ @Cleanup
+ ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer()
+ .topic(topic)
+ .subscriptionName("my-sub")
+ .receiverQueueSize(3)
+ .autoScaledReceiverQueueSizeEnabled(true)
+ .subscribe();
+ Assert.assertEquals(consumer.getCurrentReceiverQueueSize(), 1);
+
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).enableBatching(false).create();
+ byte[] data = "data".getBytes(StandardCharsets.UTF_8);
+
+ producer.send(data);
+ Assert.assertNotNull(consumer.receive());
+ Awaitility.await().until(consumer.scaleReceiverQueueHint::get);
+ log.info("getCurrentReceiverQueueSize={}", consumer.getCurrentReceiverQueueSize());
+
+ //this will trigger receiver queue size expanding.
+ Assert.assertNull(consumer.receive(0, TimeUnit.MILLISECONDS));
+
+ log.info("getCurrentReceiverQueueSize={}", consumer.getCurrentReceiverQueueSize());
+ Assert.assertEquals(consumer.getCurrentReceiverQueueSize(), 2);
+ Assert.assertFalse(consumer.scaleReceiverQueueHint.get());
+
+ for (int i = 0; i < 5; i++) {
+ producer.send(data);
+ producer.send(data);
+ Assert.assertNotNull(consumer.receive());
+ Assert.assertNotNull(consumer.receive());
+ // queue maybe full, but no empty receive, so no expanding
+ Assert.assertEquals(consumer.getCurrentReceiverQueueSize(), 2);
+ }
+
+ producer.send(data);
+ producer.send(data);
+ Awaitility.await().until(consumer.scaleReceiverQueueHint::get);
+ Assert.assertNotNull(consumer.receive());
+ Assert.assertNotNull(consumer.receive());
+ Assert.assertNull(consumer.receive(0, TimeUnit.MILLISECONDS));
+ // queue is full, with empty receive, expanding to max size
+ Assert.assertEquals(consumer.getCurrentReceiverQueueSize(), 3);
+ }
+
+ @Test
+ public void testConsumerImplBatchReceive() throws PulsarClientException {
+ String topic = "persistent://public/default/testConsumerImplBatchReceive" + System.currentTimeMillis();
+ @Cleanup
+ ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer()
+ .topic(topic)
+ .subscriptionName("my-sub")
+ .batchReceivePolicy(BatchReceivePolicy.builder().maxNumMessages(5).build())
+ .receiverQueueSize(20)
+ .autoScaledReceiverQueueSizeEnabled(true)
+ .subscribe();
+
+ int currentSize = 8;
+ Assert.assertEquals(consumer.getCurrentReceiverQueueSize(), currentSize);
+
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).enableBatching(false).create();
+ byte[] data = "data".getBytes(StandardCharsets.UTF_8);
+
+ for (int i = 0; i < 10; i++) { // just run a few times.
+ for (int j = 0; j < 5; j++) {
+ producer.send(data);
+ }
+ Awaitility.await().until(() -> consumer.incomingMessages.size() == 5);
+ log.info("i={},expandReceiverQueueHint:{},local permits:{}",
+ i, consumer.scaleReceiverQueueHint.get(), consumer.getAvailablePermits());
+ Assert.assertEquals(consumer.batchReceive().size(), 5);
+ Assert.assertEquals(consumer.getCurrentReceiverQueueSize(), currentSize);
+ log.info("getCurrentReceiverQueueSize={}", consumer.getCurrentReceiverQueueSize());
+ }
+
+ //clear local available permits.
+ int n = currentSize / 2 - consumer.getAvailablePermits();
+ for (int i = 0; i < n; i++) {
+ producer.send(data);
+ consumer.receive();
+ }
+ Assert.assertEquals(consumer.getAvailablePermits(), 0);
+
+ for (int i = 0; i < currentSize; i++) {
+ producer.send(data);
+ }
+
+ Awaitility.await().until(consumer.scaleReceiverQueueHint::get);
+ Assert.assertEquals(consumer.batchReceive().size(), 5);
+
+ //trigger expanding
+ consumer.batchReceiveAsync();
+ Awaitility.await().until(() -> consumer.getCurrentReceiverQueueSize() == currentSize * 2);
+ log.info("getCurrentReceiverQueueSize={}", consumer.getCurrentReceiverQueueSize());
+ }
+
+ @Test
+ public void testMultiConsumerImpl() throws Exception {
+ String topic = "persistent://public/default/testMultiConsumerImpl" + System.currentTimeMillis();
+ admin.topics().createPartitionedTopic(topic, 3);
+ @Cleanup
+ MultiTopicsConsumerImpl<byte[]> consumer = (MultiTopicsConsumerImpl<byte[]>) pulsarClient.newConsumer()
+ .topic(topic)
+ .subscriptionName("my-sub")
+ .receiverQueueSize(10)
+ .autoScaledReceiverQueueSizeEnabled(true)
+ .subscribe();
+
+ //queue size will be adjusted to partition number.
+ Awaitility.await().untilAsserted(() -> Assert.assertEquals(consumer.getCurrentReceiverQueueSize(), 3));
+
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).enableBatching(false).create();
+ byte[] data = "data".getBytes(StandardCharsets.UTF_8);
+
+ for (int i = 0; i < 3; i++) {
+ producer.send(data);
+ }
+ Awaitility.await().until(consumer.scaleReceiverQueueHint::get);
+ for (int i = 0; i < 3; i++) {
+ Assert.assertNotNull(consumer.receive());
+ }
+ Assert.assertTrue(consumer.scaleReceiverQueueHint.get());
+ log.info("getCurrentReceiverQueueSize={}", consumer.getCurrentReceiverQueueSize());
+ Assert.assertEquals(consumer.getCurrentReceiverQueueSize(), 3); // queue size no change
+
+ //this will trigger receiver queue size expanding.
+ Assert.assertNull(consumer.receive(0, TimeUnit.MILLISECONDS));
+
+ log.info("getCurrentReceiverQueueSize={}", consumer.getCurrentReceiverQueueSize());
+ Assert.assertEquals(consumer.getCurrentReceiverQueueSize(), 6);
+ Assert.assertFalse(consumer.scaleReceiverQueueHint.get()); //expandReceiverQueueHint is reset.
+
+ for (int i = 0; i < 5; i++) {
+ for (int j = 0; j < 6; j++) {
+ producer.send(data);
+ }
+ for (int j = 0; j < 6; j++) {
+ Assert.assertNotNull(consumer.receive());
+ }
+ log.info("i={},currentReceiverQueueSize={},expandReceiverQueueHint={}", i,
+ consumer.getCurrentReceiverQueueSize(), consumer.scaleReceiverQueueHint);
+ // queue maybe full, but no empty receive, so no expanding
+ Assert.assertEquals(consumer.getCurrentReceiverQueueSize(), 6);
+ }
+
+ for (int j = 0; j < 6; j++) {
+ producer.send(data);
+ }
+ Awaitility.await().until(() -> consumer.scaleReceiverQueueHint.get());
+ for (int j = 0; j < 6; j++) {
+ Assert.assertNotNull(consumer.receive());
+ }
+ Assert.assertNull(consumer.receive(0, TimeUnit.MILLISECONDS));
+ // queue is full, with empty receive, expanding to max size
+ log.info("getCurrentReceiverQueueSize={}", consumer.getCurrentReceiverQueueSize());
+ Assert.assertEquals(consumer.getCurrentReceiverQueueSize(), 10);
+ }
+
+ @Test
+ public void testMultiConsumerImplBatchReceive() throws PulsarClientException, PulsarAdminException {
+ String topic = "persistent://public/default/testMultiConsumerImplBatchReceive" + System.currentTimeMillis();
+ admin.topics().createPartitionedTopic(topic, 3);
+ @Cleanup
+ MultiTopicsConsumerImpl<byte[]> consumer = (MultiTopicsConsumerImpl<byte[]>) pulsarClient.newConsumer()
+ .topic(topic)
+ .subscriptionName("my-sub")
+ .batchReceivePolicy(BatchReceivePolicy.builder().maxNumMessages(5).build())
+ .receiverQueueSize(20)
+ .autoScaledReceiverQueueSizeEnabled(true)
+ .subscribe();
+
+ //receiver queue size init as 5.
+ int currentSize = 5;
+ Assert.assertEquals(consumer.getCurrentReceiverQueueSize(), currentSize);
+
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).enableBatching(false).create();
+ byte[] data = "data".getBytes(StandardCharsets.UTF_8);
+
+ for (int i = 0; i < 10; i++) { // just run a few times.
+ for (int j = 0; j < 5; j++) {
+ producer.send(data);
+ }
+ log.info("i={},expandReceiverQueueHint:{},local permits:{}",
+ i, consumer.scaleReceiverQueueHint.get(), consumer.getAvailablePermits());
+ Assert.assertEquals(consumer.batchReceive().size(), 5);
+ Assert.assertEquals(consumer.getCurrentReceiverQueueSize(), currentSize);
+ log.info("getCurrentReceiverQueueSize={}", consumer.getCurrentReceiverQueueSize());
+ }
+
+ for (int i = 0; i < currentSize; i++) {
+ producer.send(data);
+ }
+
+ Awaitility.await().until(consumer.scaleReceiverQueueHint::get);
+ Assert.assertEquals(consumer.batchReceive().size(), 5);
+
+ //trigger expanding
+ consumer.batchReceiveAsync();
+ Awaitility.await().until(() -> consumer.getCurrentReceiverQueueSize() == currentSize * 2);
+ log.info("getCurrentReceiverQueueSize={}", consumer.getCurrentReceiverQueueSize());
+ }
+}
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
index caeb3d58b82..029d4de7d9c 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
@@ -798,4 +798,18 @@ public interface ConsumerBuilder<T> extends Cloneable {
* @default false
*/
ConsumerBuilder<T> startPaused(boolean paused);
+
+ /**
+ * If this is enabled, consumer receiver queue size is init as a very small value, 1 by default,
+ * and it will double itself until it reaches the value set by {@link #receiverQueueSize(int)}, if and only if
+ * 1) User calls receive() and there is no messages in receiver queue.
+ * 2) The last message we put in the receiver queue took the last space available in receiver queue.
+ *
+ * This is disabled by default and currentReceiverQueueSize is init as maxReceiverQueueSize.
+ *
+ * The feature should be able to reduce client memory usage.
+ *
+ * @param enabled whether to enable AutoScaledReceiverQueueSize.
+ */
+ ConsumerBuilder<T> autoScaledReceiverQueueSizeEnabled(boolean enabled);
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 6215562160a..f9a7911935c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -35,6 +35,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.locks.Lock;
@@ -67,6 +68,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T> {
+ protected static final int INITIAL_RECEIVER_QUEUE_SIZE = 1;
protected final String subscription;
protected final ConsumerConfigurationData<T> conf;
@@ -102,6 +104,8 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
@Getter
protected volatile long consumerEpoch;
+ protected final AtomicBoolean scaleReceiverQueueHint = new AtomicBoolean(false);
+
protected ConsumerBase(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
int receiverQueueSize, ExecutorProvider executorProvider,
CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema,
@@ -125,7 +129,6 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
this.pendingBatchReceives = Queues.newConcurrentLinkedQueue();
this.schema = schema;
this.interceptors = interceptors;
- CURRENT_RECEIVER_QUEUE_SIZE_UPDATER.set(this, receiverQueueSize);
if (conf.getBatchReceivePolicy() != null) {
BatchReceivePolicy userBatchReceivePolicy = conf.getBatchReceivePolicy();
if (userBatchReceivePolicy.getMaxNumMessages() > this.maxReceiverQueueSize) {
@@ -160,6 +163,22 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
batchReceiveTimeout = client.timer().newTimeout(this::pendingBatchReceiveTask,
batchReceivePolicy.getTimeoutMs(), TimeUnit.MILLISECONDS);
}
+
+ initReceiverQueueSize();
+ }
+
+
+ public abstract void initReceiverQueueSize();
+
+ protected void expectMoreIncomingMessages() {
+ if (!conf.isAutoScaledReceiverQueueSizeEnabled()) {
+ return;
+ }
+ if (scaleReceiverQueueHint.compareAndSet(true, false)) {
+ int oldSize = getCurrentReceiverQueueSize();
+ int newSize = Math.min(maxReceiverQueueSize, oldSize * 2);
+ setCurrentReceiverQueueSize(newSize);
+ }
}
@Override
@@ -777,10 +796,13 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
// After we have enqueued the messages on `incomingMessages` queue, we cannot touch the message instance
// anymore, since for pooled messages, this instance was possibly already been released and recycled.
INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, messageSize);
+ updateAutoScaleReceiverQueueHint();
}
return hasEnoughMessagesForBatchReceive();
}
+ protected abstract void updateAutoScaleReceiverQueueHint();
+
protected boolean hasEnoughMessagesForBatchReceive() {
if (batchReceivePolicy.getMaxNumMessages() <= 0 && batchReceivePolicy.getMaxNumBytes() <= 0) {
return false;
@@ -1031,7 +1053,7 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
}
protected void resetIncomingMessageSize() {
- INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0);
+ INCOMING_MESSAGES_SIZE_UPDATER.getAndSet(this, 0);
}
protected void decreaseIncomingMessageSize(final Message<?> message) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index a486104bdaa..644c0025d16 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -530,4 +530,10 @@ public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> {
conf.setStartPaused(paused);
return this;
}
+
+ @Override
+ public ConsumerBuilder<T> autoScaledReceiverQueueSizeEnabled(boolean enabled) {
+ conf.setAutoScaledReceiverQueueSizeEnabled(enabled);
+ return this;
+ }
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index e666dae63b7..3d8f7e56330 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -412,10 +412,28 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
return unsubscribeFuture;
}
+ @Override
+ public void initReceiverQueueSize() {
+ if (conf.isAutoScaledReceiverQueueSizeEnabled()) {
+ // turn on autoScaledReceiverQueueSize
+ int size = Math.min(INITIAL_RECEIVER_QUEUE_SIZE, maxReceiverQueueSize);
+ if (batchReceivePolicy.getMaxNumMessages() > 0) {
+ // consumerImpl may store (half-1) permits locally.
+ size = Math.max(size, 2 * batchReceivePolicy.getMaxNumMessages() - 2);
+ }
+ CURRENT_RECEIVER_QUEUE_SIZE_UPDATER.set(this, size);
+ } else {
+ CURRENT_RECEIVER_QUEUE_SIZE_UPDATER.set(this, maxReceiverQueueSize);
+ }
+ }
+
@Override
protected Message<T> internalReceive() throws PulsarClientException {
Message<T> message;
try {
+ if (incomingMessages.isEmpty()) {
+ expectMoreIncomingMessages();
+ }
message = incomingMessages.take();
messageProcessed(message);
if (!isValidConsumerEpoch(message)) {
@@ -439,6 +457,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
internalPinnedExecutor.execute(() -> {
Message<T> message = incomingMessages.poll();
if (message == null) {
+ expectMoreIncomingMessages();
pendingReceives.add(result);
cancellationHandler.setCancelAction(() -> pendingReceives.remove(result));
} else {
@@ -460,6 +479,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
Message<T> message;
long callTime = System.nanoTime();
try {
+ if (incomingMessages.isEmpty()) {
+ expectMoreIncomingMessages();
+ }
message = incomingMessages.poll(timeout, unit);
if (message == null) {
return null;
@@ -524,6 +546,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
}
result.complete(messages);
} else {
+ expectMoreIncomingMessages();
OpBatchReceive<T> opBatchReceive = OpBatchReceive.of(result);
pendingBatchReceives.add(opBatchReceive);
cancellationHandler.setCancelAction(() -> pendingBatchReceives.remove(opBatchReceive));
@@ -1609,7 +1632,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
int oldSize = CURRENT_RECEIVER_QUEUE_SIZE_UPDATER.getAndSet(this, newSize);
int delta = newSize - oldSize;
if (log.isDebugEnabled()) {
- log.debug("[{}][{}] update maxReceiverQueueSize from {} to {}, increaseAvailablePermits by {}",
+ log.debug("[{}][{}] update currentReceiverQueueSize from {} to {}, increaseAvailablePermits by {}",
topic, subscription, oldSize, newSize, delta);
}
increaseAvailablePermits(delta);
@@ -1902,6 +1925,11 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
}
}
+ @Override
+ protected void updateAutoScaleReceiverQueueHint() {
+ scaleReceiverQueueHint.set(getAvailablePermits() + incomingMessages.size() >= getCurrentReceiverQueueSize());
+ }
+
@Override
protected void completeOpBatchReceive(OpBatchReceive<T> op) {
notifyPendingBatchReceivedCallBack(op);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 21e0b9941ce..c0c626a600a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -337,10 +337,26 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
.getMessage()));
}
+ @Override
+ public void initReceiverQueueSize() {
+ if (conf.isAutoScaledReceiverQueueSizeEnabled()) {
+ int size = Math.min(INITIAL_RECEIVER_QUEUE_SIZE, maxReceiverQueueSize);
+ if (batchReceivePolicy.getMaxNumMessages() > 0) {
+ size = Math.max(size, batchReceivePolicy.getMaxNumMessages());
+ }
+ CURRENT_RECEIVER_QUEUE_SIZE_UPDATER.set(this, size);
+ } else {
+ CURRENT_RECEIVER_QUEUE_SIZE_UPDATER.set(this, maxReceiverQueueSize);
+ }
+ }
+
@Override
protected Message<T> internalReceive() throws PulsarClientException {
Message<T> message;
try {
+ if (incomingMessages.isEmpty()) {
+ expectMoreIncomingMessages();
+ }
message = incomingMessages.take();
decreaseIncomingMessageSize(message);
checkState(message instanceof TopicMessageImpl);
@@ -363,6 +379,9 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
long callTime = System.nanoTime();
try {
+ if (incomingMessages.isEmpty()) {
+ expectMoreIncomingMessages();
+ }
message = incomingMessages.poll(timeout, unit);
if (message != null) {
decreaseIncomingMessageSize(message);
@@ -425,6 +444,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
}
result.complete(messages);
} else {
+ expectMoreIncomingMessages();
OpBatchReceive<T> opBatchReceive = OpBatchReceive.of(result);
pendingBatchReceives.add(opBatchReceive);
cancellationHandler.setCancelAction(() -> pendingBatchReceives.remove(opBatchReceive));
@@ -444,6 +464,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
internalPinnedExecutor.execute(() -> {
Message<T> message = incomingMessages.poll();
if (message == null) {
+ expectMoreIncomingMessages();
pendingReceives.add(result);
cancellationHandler.setCancelAction(() -> pendingReceives.remove(result));
} else {
@@ -714,6 +735,11 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
resumeReceivingFromPausedConsumersIfNeeded();
}
+ @Override
+ protected void updateAutoScaleReceiverQueueHint() {
+ scaleReceiverQueueHint.set(incomingMessages.size() >= getCurrentReceiverQueueSize());
+ }
+
@Override
protected void completeOpBatchReceive(OpBatchReceive<T> op) {
notifyPendingBatchReceivedCallBack(op);
@@ -1487,7 +1513,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
checkArgument(newSize > 0, "receiver queue size should larger than 0");
if (log.isDebugEnabled()) {
log.debug("[{}][{}] setMaxReceiverQueueSize={}, previous={}", topic, subscription,
- getCurrentReceiverQueueSize(), newSize);
+ newSize, getCurrentReceiverQueueSize());
}
CURRENT_RECEIVER_QUEUE_SIZE_UPDATER.set(this, newSize);
resumeReceivingFromPausedConsumersIfNeeded();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
index 9f0f2e215a5..e63d803237e 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
@@ -56,6 +56,16 @@ public class ZeroQueueConsumerImpl<T> extends ConsumerImpl<T> {
createTopicIfDoesNotExist);
}
+ @Override
+ public void initReceiverQueueSize() {
+ if (conf.isAutoScaledReceiverQueueSizeEnabled()) {
+ throw new NotImplementedException("AutoScaledReceiverQueueSize is not supported in ZeroQueueConsumerImpl");
+ } else {
+ CURRENT_RECEIVER_QUEUE_SIZE_UPDATER.set(this, 0);
+ }
+
+ }
+
@Override
protected Message<T> internalReceive() throws PulsarClientException {
zeroQueueLock.lock();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
index cf1f60d8e93..6c22d143a6f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
@@ -162,6 +162,8 @@ public class ConsumerConfigurationData<T> implements Serializable, Cloneable {
private boolean startPaused = false;
+ private boolean autoScaledReceiverQueueSizeEnabled = false;
+
public void setAutoUpdatePartitionsIntervalSeconds(int interval, TimeUnit timeUnit) {
checkArgument(interval > 0, "interval needs to be > 0");
this.autoUpdatePartitionsIntervalSeconds = timeUnit.toSeconds(interval);
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
index 9d1920ac4f6..bdee5e2df08 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
@@ -54,6 +54,9 @@ import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.transaction.Transaction;
+import org.apache.pulsar.client.impl.ConsumerBase;
+import org.apache.pulsar.client.impl.ConsumerImpl;
+import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.testclient.utils.PaddingDecimalFormat;
import org.slf4j.Logger;
@@ -130,6 +133,10 @@ public class PerformanceConsumer {
description = "Max total size of the receiver queue across partitions")
public int maxTotalReceiverQueueSizeAcrossPartitions = 50000;
+ @Parameter(names = {"-aq", "--auto-scaled-receiver-queue-size"},
+ description = "Enable autoScaledReceiverQueueSize")
+ public boolean autoScaledReceiverQueueSize = false;
+
@Parameter(names = { "--replicated" }, description = "Whether the subscription status should be replicated")
public boolean replicatedSubscription = false;
@@ -342,6 +349,8 @@ public class PerformanceConsumer {
ObjectWriter w = m.writerWithDefaultPrettyPrinter();
log.info("Starting Pulsar performance consumer with config: {}", w.writeValueAsString(arguments));
+ final Recorder qRecorder = arguments.autoScaledReceiverQueueSize
+ ? new Recorder(arguments.receiverQueueSize, 5) : null;
final RateLimiter limiter = arguments.rate > 0 ? RateLimiter.create(arguments.rate) : null;
long startTime = System.nanoTime();
long testEndTime = startTime + (long) (arguments.testTime * 1e9);
@@ -394,6 +403,9 @@ public class PerformanceConsumer {
thread.interrupt();
}
}
+ if (qRecorder != null) {
+ qRecorder.recordValue(((ConsumerBase<?>) consumer).getTotalIncomingMessages());
+ }
messagesReceived.increment();
bytesReceived.add(msg.size());
@@ -500,7 +512,8 @@ public class PerformanceConsumer {
.autoAckOldestChunkedMessageOnQueueFull(arguments.autoAckOldestChunkedMessageOnQueueFull)
.enableBatchIndexAcknowledgment(arguments.batchIndexAck)
.poolMessages(arguments.poolMessages)
- .replicateSubscriptionState(arguments.replicatedSubscription);
+ .replicateSubscriptionState(arguments.replicatedSubscription)
+ .autoScaledReceiverQueueSizeEnabled(arguments.autoScaledReceiverQueueSize);
if (arguments.maxPendingChunkedMessage > 0) {
consumerBuilder.maxPendingChunkedMessage(arguments.maxPendingChunkedMessage);
}
@@ -543,6 +556,7 @@ public class PerformanceConsumer {
long oldTime = System.nanoTime();
Histogram reportHistogram = null;
+ Histogram qHistogram = null;
HistogramLogWriter histogramLogWriter = null;
if (arguments.histogramFile != null) {
@@ -596,6 +610,28 @@ public class PerformanceConsumer {
reportHistogram.getValueAtPercentile(99), reportHistogram.getValueAtPercentile(99.9),
reportHistogram.getValueAtPercentile(99.99), reportHistogram.getMaxValue());
+ if (arguments.autoScaledReceiverQueueSize && log.isDebugEnabled() && qRecorder != null) {
+ qHistogram = qRecorder.getIntervalHistogram(qHistogram);
+ log.debug("ReceiverQueueUsage: cnt={},mean={}, min={},max={},25pct={},50pct={},75pct={}",
+ qHistogram.getTotalCount(), dec.format(qHistogram.getMean()),
+ qHistogram.getMinValue(), qHistogram.getMaxValue(),
+ qHistogram.getValueAtPercentile(25),
+ qHistogram.getValueAtPercentile(50),
+ qHistogram.getValueAtPercentile(75)
+ );
+ qHistogram.reset();
+ for (Future<Consumer<ByteBuffer>> future : futures) {
+ ConsumerBase<?> consumerBase = (ConsumerBase<?>) future.get();
+ log.debug("[{}] CurrentReceiverQueueSize={}", consumerBase.getConsumerName(),
+ consumerBase.getCurrentReceiverQueueSize());
+ if (consumerBase instanceof MultiTopicsConsumerImpl) {
+ for (ConsumerImpl<?> consumer : ((MultiTopicsConsumerImpl<?>) consumerBase).getConsumers()) {
+ log.debug("[{}] SubConsumer.CurrentReceiverQueueSize={}", consumer.getConsumerName(),
+ consumer.getCurrentReceiverQueueSize());
+ }
+ }
+ }
+ }
if (histogramLogWriter != null) {
histogramLogWriter.outputIntervalHistogram(reportHistogram);
}