You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/04/18 00:52:26 UTC

[pulsar] branch master updated: [PIP-74] Support auto scaled consumer receiver queue (#14494)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new bb0e0f2b432 [PIP-74] Support auto scaled consumer receiver queue  (#14494)
bb0e0f2b432 is described below

commit bb0e0f2b432bbcdd67a59d8c08cf768811c459ec
Author: JiangHaiting <ji...@apache.org>
AuthorDate: Mon Apr 18 08:52:19 2022 +0800

    [PIP-74] Support auto scaled consumer receiver queue  (#14494)
    
    * Add autoScaledReceiverQueueSize
    
    * Add autoScaledReceiverQueueSize for PerformanceConsumer
    
    * remove memory limit code
    
    * fix typo
---
 .../impl/AutoScaledReceiverQueueSizeTest.java      | 258 +++++++++++++++++++++
 .../apache/pulsar/client/api/ConsumerBuilder.java  |  14 ++
 .../apache/pulsar/client/impl/ConsumerBase.java    |  26 ++-
 .../pulsar/client/impl/ConsumerBuilderImpl.java    |   6 +
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  30 ++-
 .../client/impl/MultiTopicsConsumerImpl.java       |  28 ++-
 .../pulsar/client/impl/ZeroQueueConsumerImpl.java  |  10 +
 .../impl/conf/ConsumerConfigurationData.java       |   2 +
 .../pulsar/testclient/PerformanceConsumer.java     |  38 ++-
 9 files changed, 407 insertions(+), 5 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AutoScaledReceiverQueueSizeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AutoScaledReceiverQueueSizeTest.java
new file mode 100644
index 00000000000..2b9e2dc3adf
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AutoScaledReceiverQueueSizeTest.java
@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.client.impl;
+
+
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.BatchReceivePolicy;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.awaitility.Awaitility;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class AutoScaledReceiverQueueSizeTest extends MockedPulsarServiceBaseTest {
+    @BeforeClass(alwaysRun = true)
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        setupDefaultTenantAndNamespace();
+    }
+
+    @BeforeClass(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testConsumerImpl() throws PulsarClientException {
+        String topic = "persistent://public/default/testConsumerImpl" + System.currentTimeMillis();
+        @Cleanup
+        ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionName("my-sub")
+                .receiverQueueSize(3)
+                .autoScaledReceiverQueueSizeEnabled(true)
+                .subscribe();
+        Assert.assertEquals(consumer.getCurrentReceiverQueueSize(), 1);
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).enableBatching(false).create();
+        byte[] data = "data".getBytes(StandardCharsets.UTF_8);
+
+        producer.send(data);
+        Assert.assertNotNull(consumer.receive());
+        Awaitility.await().until(consumer.scaleReceiverQueueHint::get);
+        log.info("getCurrentReceiverQueueSize={}", consumer.getCurrentReceiverQueueSize());
+
+        //this will trigger receiver queue size expanding.
+        Assert.assertNull(consumer.receive(0, TimeUnit.MILLISECONDS));
+
+        log.info("getCurrentReceiverQueueSize={}", consumer.getCurrentReceiverQueueSize());
+        Assert.assertEquals(consumer.getCurrentReceiverQueueSize(), 2);
+        Assert.assertFalse(consumer.scaleReceiverQueueHint.get());
+
+        for (int i = 0; i < 5; i++) {
+            producer.send(data);
+            producer.send(data);
+            Assert.assertNotNull(consumer.receive());
+            Assert.assertNotNull(consumer.receive());
+            // queue maybe full, but no empty receive, so no expanding
+            Assert.assertEquals(consumer.getCurrentReceiverQueueSize(), 2);
+        }
+
+        producer.send(data);
+        producer.send(data);
+        Awaitility.await().until(consumer.scaleReceiverQueueHint::get);
+        Assert.assertNotNull(consumer.receive());
+        Assert.assertNotNull(consumer.receive());
+        Assert.assertNull(consumer.receive(0, TimeUnit.MILLISECONDS));
+        // queue is full, with empty receive, expanding to max size
+        Assert.assertEquals(consumer.getCurrentReceiverQueueSize(), 3);
+    }
+
+    @Test
+    public void testConsumerImplBatchReceive() throws PulsarClientException {
+        String topic = "persistent://public/default/testConsumerImplBatchReceive" + System.currentTimeMillis();
+        @Cleanup
+        ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionName("my-sub")
+                .batchReceivePolicy(BatchReceivePolicy.builder().maxNumMessages(5).build())
+                .receiverQueueSize(20)
+                .autoScaledReceiverQueueSizeEnabled(true)
+                .subscribe();
+
+        int currentSize = 8;
+        Assert.assertEquals(consumer.getCurrentReceiverQueueSize(), currentSize);
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).enableBatching(false).create();
+        byte[] data = "data".getBytes(StandardCharsets.UTF_8);
+
+        for (int i = 0; i < 10; i++) { // just run a few times.
+            for (int j = 0; j < 5; j++) {
+                producer.send(data);
+            }
+            Awaitility.await().until(() -> consumer.incomingMessages.size() == 5);
+            log.info("i={},expandReceiverQueueHint:{},local permits:{}",
+                    i, consumer.scaleReceiverQueueHint.get(), consumer.getAvailablePermits());
+            Assert.assertEquals(consumer.batchReceive().size(), 5);
+            Assert.assertEquals(consumer.getCurrentReceiverQueueSize(), currentSize);
+            log.info("getCurrentReceiverQueueSize={}", consumer.getCurrentReceiverQueueSize());
+        }
+
+        //clear local available permits.
+        int n = currentSize / 2 - consumer.getAvailablePermits();
+        for (int i = 0; i < n; i++) {
+            producer.send(data);
+            consumer.receive();
+        }
+        Assert.assertEquals(consumer.getAvailablePermits(), 0);
+
+        for (int i = 0; i < currentSize; i++) {
+            producer.send(data);
+        }
+
+        Awaitility.await().until(consumer.scaleReceiverQueueHint::get);
+        Assert.assertEquals(consumer.batchReceive().size(), 5);
+
+        //trigger expanding
+        consumer.batchReceiveAsync();
+        Awaitility.await().until(() -> consumer.getCurrentReceiverQueueSize() == currentSize * 2);
+        log.info("getCurrentReceiverQueueSize={}", consumer.getCurrentReceiverQueueSize());
+    }
+
+    @Test
+    public void testMultiConsumerImpl() throws Exception {
+        String topic = "persistent://public/default/testMultiConsumerImpl" + System.currentTimeMillis();
+        admin.topics().createPartitionedTopic(topic, 3);
+        @Cleanup
+        MultiTopicsConsumerImpl<byte[]> consumer = (MultiTopicsConsumerImpl<byte[]>) pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionName("my-sub")
+                .receiverQueueSize(10)
+                .autoScaledReceiverQueueSizeEnabled(true)
+                .subscribe();
+
+        //queue size will be adjusted to partition number.
+        Awaitility.await().untilAsserted(() -> Assert.assertEquals(consumer.getCurrentReceiverQueueSize(), 3));
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).enableBatching(false).create();
+        byte[] data = "data".getBytes(StandardCharsets.UTF_8);
+
+        for (int i = 0; i < 3; i++) {
+            producer.send(data);
+        }
+        Awaitility.await().until(consumer.scaleReceiverQueueHint::get);
+        for (int i = 0; i < 3; i++) {
+            Assert.assertNotNull(consumer.receive());
+        }
+        Assert.assertTrue(consumer.scaleReceiverQueueHint.get());
+        log.info("getCurrentReceiverQueueSize={}", consumer.getCurrentReceiverQueueSize());
+        Assert.assertEquals(consumer.getCurrentReceiverQueueSize(), 3); // queue size no change
+
+        //this will trigger receiver queue size expanding.
+        Assert.assertNull(consumer.receive(0, TimeUnit.MILLISECONDS));
+
+        log.info("getCurrentReceiverQueueSize={}", consumer.getCurrentReceiverQueueSize());
+        Assert.assertEquals(consumer.getCurrentReceiverQueueSize(), 6);
+        Assert.assertFalse(consumer.scaleReceiverQueueHint.get()); //expandReceiverQueueHint is reset.
+
+        for (int i = 0; i < 5; i++) {
+            for (int j = 0; j < 6; j++) {
+                producer.send(data);
+            }
+            for (int j = 0; j < 6; j++) {
+                Assert.assertNotNull(consumer.receive());
+            }
+            log.info("i={},currentReceiverQueueSize={},expandReceiverQueueHint={}", i,
+                    consumer.getCurrentReceiverQueueSize(), consumer.scaleReceiverQueueHint);
+            // queue maybe full, but no empty receive, so no expanding
+            Assert.assertEquals(consumer.getCurrentReceiverQueueSize(), 6);
+        }
+
+        for (int j = 0; j < 6; j++) {
+            producer.send(data);
+        }
+        Awaitility.await().until(() -> consumer.scaleReceiverQueueHint.get());
+        for (int j = 0; j < 6; j++) {
+            Assert.assertNotNull(consumer.receive());
+        }
+        Assert.assertNull(consumer.receive(0, TimeUnit.MILLISECONDS));
+        // queue is full, with empty receive, expanding to max size
+        log.info("getCurrentReceiverQueueSize={}", consumer.getCurrentReceiverQueueSize());
+        Assert.assertEquals(consumer.getCurrentReceiverQueueSize(), 10);
+    }
+
+    @Test
+    public void testMultiConsumerImplBatchReceive() throws PulsarClientException, PulsarAdminException {
+        String topic = "persistent://public/default/testMultiConsumerImplBatchReceive" + System.currentTimeMillis();
+        admin.topics().createPartitionedTopic(topic, 3);
+        @Cleanup
+        MultiTopicsConsumerImpl<byte[]> consumer = (MultiTopicsConsumerImpl<byte[]>) pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionName("my-sub")
+                .batchReceivePolicy(BatchReceivePolicy.builder().maxNumMessages(5).build())
+                .receiverQueueSize(20)
+                .autoScaledReceiverQueueSizeEnabled(true)
+                .subscribe();
+
+        //receiver queue size init as 5.
+        int currentSize = 5;
+        Assert.assertEquals(consumer.getCurrentReceiverQueueSize(), currentSize);
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).enableBatching(false).create();
+        byte[] data = "data".getBytes(StandardCharsets.UTF_8);
+
+        for (int i = 0; i < 10; i++) { // just run a few times.
+            for (int j = 0; j < 5; j++) {
+                producer.send(data);
+            }
+            log.info("i={},expandReceiverQueueHint:{},local permits:{}",
+                    i, consumer.scaleReceiverQueueHint.get(), consumer.getAvailablePermits());
+            Assert.assertEquals(consumer.batchReceive().size(), 5);
+            Assert.assertEquals(consumer.getCurrentReceiverQueueSize(), currentSize);
+            log.info("getCurrentReceiverQueueSize={}", consumer.getCurrentReceiverQueueSize());
+        }
+
+        for (int i = 0; i < currentSize; i++) {
+            producer.send(data);
+        }
+
+        Awaitility.await().until(consumer.scaleReceiverQueueHint::get);
+        Assert.assertEquals(consumer.batchReceive().size(), 5);
+
+        //trigger expanding
+        consumer.batchReceiveAsync();
+        Awaitility.await().until(() -> consumer.getCurrentReceiverQueueSize() == currentSize * 2);
+        log.info("getCurrentReceiverQueueSize={}", consumer.getCurrentReceiverQueueSize());
+    }
+}
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
index caeb3d58b82..029d4de7d9c 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
@@ -798,4 +798,18 @@ public interface ConsumerBuilder<T> extends Cloneable {
      * @default false
      */
     ConsumerBuilder<T> startPaused(boolean paused);
+
+    /**
+     * If this is enabled, consumer receiver queue size is init as a very small value, 1 by default,
+     * and it will double itself until it reaches the value set by {@link #receiverQueueSize(int)}, if and only if
+     * 1) User calls receive() and there is no messages in receiver queue.
+     * 2) The last message we put in the receiver queue took the last space available in receiver queue.
+     *
+     * This is disabled by default and currentReceiverQueueSize is init as maxReceiverQueueSize.
+     *
+     * The feature should be able to reduce client memory usage.
+     *
+     * @param enabled whether to enable AutoScaledReceiverQueueSize.
+     */
+    ConsumerBuilder<T> autoScaledReceiverQueueSizeEnabled(boolean enabled);
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 6215562160a..f9a7911935c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -35,6 +35,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.concurrent.locks.Lock;
@@ -67,6 +68,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T> {
+    protected static final int INITIAL_RECEIVER_QUEUE_SIZE = 1;
 
     protected final String subscription;
     protected final ConsumerConfigurationData<T> conf;
@@ -102,6 +104,8 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
     @Getter
     protected volatile long consumerEpoch;
 
+    protected final AtomicBoolean scaleReceiverQueueHint = new AtomicBoolean(false);
+
     protected ConsumerBase(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
                            int receiverQueueSize, ExecutorProvider executorProvider,
                            CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema,
@@ -125,7 +129,6 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
         this.pendingBatchReceives = Queues.newConcurrentLinkedQueue();
         this.schema = schema;
         this.interceptors = interceptors;
-        CURRENT_RECEIVER_QUEUE_SIZE_UPDATER.set(this, receiverQueueSize);
         if (conf.getBatchReceivePolicy() != null) {
             BatchReceivePolicy userBatchReceivePolicy = conf.getBatchReceivePolicy();
             if (userBatchReceivePolicy.getMaxNumMessages() > this.maxReceiverQueueSize) {
@@ -160,6 +163,22 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
             batchReceiveTimeout = client.timer().newTimeout(this::pendingBatchReceiveTask,
                     batchReceivePolicy.getTimeoutMs(), TimeUnit.MILLISECONDS);
         }
+
+        initReceiverQueueSize();
+    }
+
+
+    public abstract void initReceiverQueueSize();
+
+    protected void expectMoreIncomingMessages() {
+        if (!conf.isAutoScaledReceiverQueueSizeEnabled()) {
+            return;
+        }
+        if (scaleReceiverQueueHint.compareAndSet(true, false)) {
+            int oldSize = getCurrentReceiverQueueSize();
+            int newSize = Math.min(maxReceiverQueueSize, oldSize * 2);
+            setCurrentReceiverQueueSize(newSize);
+        }
     }
 
     @Override
@@ -777,10 +796,13 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
             // After we have enqueued the messages on `incomingMessages` queue, we cannot touch the message instance
             // anymore, since for pooled messages, this instance was possibly already been released and recycled.
             INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, messageSize);
+            updateAutoScaleReceiverQueueHint();
         }
         return hasEnoughMessagesForBatchReceive();
     }
 
+    protected abstract void updateAutoScaleReceiverQueueHint();
+
     protected boolean hasEnoughMessagesForBatchReceive() {
         if (batchReceivePolicy.getMaxNumMessages() <= 0 && batchReceivePolicy.getMaxNumBytes() <= 0) {
             return false;
@@ -1031,7 +1053,7 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
     }
 
     protected void resetIncomingMessageSize() {
-        INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0);
+        INCOMING_MESSAGES_SIZE_UPDATER.getAndSet(this, 0);
     }
 
     protected void decreaseIncomingMessageSize(final Message<?> message) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index a486104bdaa..644c0025d16 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -530,4 +530,10 @@ public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> {
         conf.setStartPaused(paused);
         return this;
     }
+
+    @Override
+    public ConsumerBuilder<T> autoScaledReceiverQueueSizeEnabled(boolean enabled) {
+        conf.setAutoScaledReceiverQueueSizeEnabled(enabled);
+        return this;
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index e666dae63b7..3d8f7e56330 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -412,10 +412,28 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         return unsubscribeFuture;
     }
 
+    @Override
+    public void initReceiverQueueSize() {
+        if (conf.isAutoScaledReceiverQueueSizeEnabled()) {
+            // turn on autoScaledReceiverQueueSize
+            int size = Math.min(INITIAL_RECEIVER_QUEUE_SIZE, maxReceiverQueueSize);
+            if (batchReceivePolicy.getMaxNumMessages() > 0) {
+                // consumerImpl may store (half-1) permits locally.
+                size = Math.max(size, 2 * batchReceivePolicy.getMaxNumMessages() - 2);
+            }
+            CURRENT_RECEIVER_QUEUE_SIZE_UPDATER.set(this, size);
+        } else {
+            CURRENT_RECEIVER_QUEUE_SIZE_UPDATER.set(this, maxReceiverQueueSize);
+        }
+    }
+
     @Override
     protected Message<T> internalReceive() throws PulsarClientException {
         Message<T> message;
         try {
+            if (incomingMessages.isEmpty()) {
+                expectMoreIncomingMessages();
+            }
             message = incomingMessages.take();
             messageProcessed(message);
             if (!isValidConsumerEpoch(message)) {
@@ -439,6 +457,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         internalPinnedExecutor.execute(() -> {
             Message<T> message = incomingMessages.poll();
             if (message == null) {
+                expectMoreIncomingMessages();
                 pendingReceives.add(result);
                 cancellationHandler.setCancelAction(() -> pendingReceives.remove(result));
             } else {
@@ -460,6 +479,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         Message<T> message;
         long callTime = System.nanoTime();
         try {
+            if (incomingMessages.isEmpty()) {
+                expectMoreIncomingMessages();
+            }
             message = incomingMessages.poll(timeout, unit);
             if (message == null) {
                 return null;
@@ -524,6 +546,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
                 }
                 result.complete(messages);
             } else {
+                expectMoreIncomingMessages();
                 OpBatchReceive<T> opBatchReceive = OpBatchReceive.of(result);
                 pendingBatchReceives.add(opBatchReceive);
                 cancellationHandler.setCancelAction(() -> pendingBatchReceives.remove(opBatchReceive));
@@ -1609,7 +1632,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         int oldSize = CURRENT_RECEIVER_QUEUE_SIZE_UPDATER.getAndSet(this, newSize);
         int delta = newSize - oldSize;
         if (log.isDebugEnabled()) {
-            log.debug("[{}][{}] update maxReceiverQueueSize from {} to {}, increaseAvailablePermits by {}",
+            log.debug("[{}][{}] update currentReceiverQueueSize from {} to {}, increaseAvailablePermits by {}",
                     topic, subscription, oldSize, newSize, delta);
         }
         increaseAvailablePermits(delta);
@@ -1902,6 +1925,11 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         }
     }
 
+    @Override
+    protected void updateAutoScaleReceiverQueueHint() {
+        scaleReceiverQueueHint.set(getAvailablePermits() + incomingMessages.size() >= getCurrentReceiverQueueSize());
+    }
+
     @Override
     protected void completeOpBatchReceive(OpBatchReceive<T> op) {
         notifyPendingBatchReceivedCallBack(op);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 21e0b9941ce..c0c626a600a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -337,10 +337,26 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
                 .getMessage()));
     }
 
+    @Override
+    public void initReceiverQueueSize() {
+        if (conf.isAutoScaledReceiverQueueSizeEnabled()) {
+            int size = Math.min(INITIAL_RECEIVER_QUEUE_SIZE, maxReceiverQueueSize);
+            if (batchReceivePolicy.getMaxNumMessages() > 0) {
+                size = Math.max(size, batchReceivePolicy.getMaxNumMessages());
+            }
+            CURRENT_RECEIVER_QUEUE_SIZE_UPDATER.set(this, size);
+        } else {
+            CURRENT_RECEIVER_QUEUE_SIZE_UPDATER.set(this, maxReceiverQueueSize);
+        }
+    }
+
     @Override
     protected Message<T> internalReceive() throws PulsarClientException {
         Message<T> message;
         try {
+            if (incomingMessages.isEmpty()) {
+                expectMoreIncomingMessages();
+            }
             message = incomingMessages.take();
             decreaseIncomingMessageSize(message);
             checkState(message instanceof TopicMessageImpl);
@@ -363,6 +379,9 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
 
         long callTime = System.nanoTime();
         try {
+            if (incomingMessages.isEmpty()) {
+                expectMoreIncomingMessages();
+            }
             message = incomingMessages.poll(timeout, unit);
             if (message != null) {
                 decreaseIncomingMessageSize(message);
@@ -425,6 +444,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
                 }
                 result.complete(messages);
             } else {
+                expectMoreIncomingMessages();
                 OpBatchReceive<T> opBatchReceive = OpBatchReceive.of(result);
                 pendingBatchReceives.add(opBatchReceive);
                 cancellationHandler.setCancelAction(() -> pendingBatchReceives.remove(opBatchReceive));
@@ -444,6 +464,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
         internalPinnedExecutor.execute(() -> {
             Message<T> message = incomingMessages.poll();
             if (message == null) {
+                expectMoreIncomingMessages();
                 pendingReceives.add(result);
                 cancellationHandler.setCancelAction(() -> pendingReceives.remove(result));
             } else {
@@ -714,6 +735,11 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
         resumeReceivingFromPausedConsumersIfNeeded();
     }
 
+    @Override
+    protected void updateAutoScaleReceiverQueueHint() {
+        scaleReceiverQueueHint.set(incomingMessages.size() >= getCurrentReceiverQueueSize());
+    }
+
     @Override
     protected void completeOpBatchReceive(OpBatchReceive<T> op) {
         notifyPendingBatchReceivedCallBack(op);
@@ -1487,7 +1513,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
         checkArgument(newSize > 0, "receiver queue size should larger than 0");
         if (log.isDebugEnabled()) {
             log.debug("[{}][{}] setMaxReceiverQueueSize={}, previous={}", topic, subscription,
-                    getCurrentReceiverQueueSize(), newSize);
+                    newSize, getCurrentReceiverQueueSize());
         }
         CURRENT_RECEIVER_QUEUE_SIZE_UPDATER.set(this, newSize);
         resumeReceivingFromPausedConsumersIfNeeded();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
index 9f0f2e215a5..e63d803237e 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
@@ -56,6 +56,16 @@ public class ZeroQueueConsumerImpl<T> extends ConsumerImpl<T> {
                 createTopicIfDoesNotExist);
     }
 
+    @Override
+    public void initReceiverQueueSize() {
+        if (conf.isAutoScaledReceiverQueueSizeEnabled()) {
+            throw new NotImplementedException("AutoScaledReceiverQueueSize is not supported in ZeroQueueConsumerImpl");
+        } else {
+            CURRENT_RECEIVER_QUEUE_SIZE_UPDATER.set(this, 0);
+        }
+
+    }
+
     @Override
     protected Message<T> internalReceive() throws PulsarClientException {
         zeroQueueLock.lock();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
index cf1f60d8e93..6c22d143a6f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
@@ -162,6 +162,8 @@ public class ConsumerConfigurationData<T> implements Serializable, Cloneable {
 
     private boolean startPaused = false;
 
+    private boolean autoScaledReceiverQueueSizeEnabled = false;
+
     public void setAutoUpdatePartitionsIntervalSeconds(int interval, TimeUnit timeUnit) {
         checkArgument(interval > 0, "interval needs to be > 0");
         this.autoUpdatePartitionsIntervalSeconds = timeUnit.toSeconds(interval);
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
index 9d1920ac4f6..bdee5e2df08 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
@@ -54,6 +54,9 @@ import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.transaction.Transaction;
+import org.apache.pulsar.client.impl.ConsumerBase;
+import org.apache.pulsar.client.impl.ConsumerImpl;
+import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.testclient.utils.PaddingDecimalFormat;
 import org.slf4j.Logger;
@@ -130,6 +133,10 @@ public class PerformanceConsumer {
                 description = "Max total size of the receiver queue across partitions")
         public int maxTotalReceiverQueueSizeAcrossPartitions = 50000;
 
+        @Parameter(names = {"-aq", "--auto-scaled-receiver-queue-size"},
+                description = "Enable autoScaledReceiverQueueSize")
+        public boolean autoScaledReceiverQueueSize = false;
+
         @Parameter(names = { "--replicated" }, description = "Whether the subscription status should be replicated")
         public boolean replicatedSubscription = false;
 
@@ -342,6 +349,8 @@ public class PerformanceConsumer {
         ObjectWriter w = m.writerWithDefaultPrettyPrinter();
         log.info("Starting Pulsar performance consumer with config: {}", w.writeValueAsString(arguments));
 
+        final Recorder qRecorder = arguments.autoScaledReceiverQueueSize
+                ? new Recorder(arguments.receiverQueueSize, 5) : null;
         final RateLimiter limiter = arguments.rate > 0 ? RateLimiter.create(arguments.rate) : null;
         long startTime = System.nanoTime();
         long testEndTime = startTime + (long) (arguments.testTime * 1e9);
@@ -394,6 +403,9 @@ public class PerformanceConsumer {
                         thread.interrupt();
                     }
                 }
+                if (qRecorder != null) {
+                    qRecorder.recordValue(((ConsumerBase<?>) consumer).getTotalIncomingMessages());
+                }
                 messagesReceived.increment();
                 bytesReceived.add(msg.size());
 
@@ -500,7 +512,8 @@ public class PerformanceConsumer {
                 .autoAckOldestChunkedMessageOnQueueFull(arguments.autoAckOldestChunkedMessageOnQueueFull)
                 .enableBatchIndexAcknowledgment(arguments.batchIndexAck)
                 .poolMessages(arguments.poolMessages)
-                .replicateSubscriptionState(arguments.replicatedSubscription);
+                .replicateSubscriptionState(arguments.replicatedSubscription)
+                .autoScaledReceiverQueueSizeEnabled(arguments.autoScaledReceiverQueueSize);
         if (arguments.maxPendingChunkedMessage > 0) {
             consumerBuilder.maxPendingChunkedMessage(arguments.maxPendingChunkedMessage);
         }
@@ -543,6 +556,7 @@ public class PerformanceConsumer {
         long oldTime = System.nanoTime();
 
         Histogram reportHistogram = null;
+        Histogram qHistogram = null;
         HistogramLogWriter histogramLogWriter = null;
 
         if (arguments.histogramFile != null) {
@@ -596,6 +610,28 @@ public class PerformanceConsumer {
                     reportHistogram.getValueAtPercentile(99), reportHistogram.getValueAtPercentile(99.9),
                     reportHistogram.getValueAtPercentile(99.99), reportHistogram.getMaxValue());
 
+            if (arguments.autoScaledReceiverQueueSize && log.isDebugEnabled() && qRecorder != null) {
+                qHistogram = qRecorder.getIntervalHistogram(qHistogram);
+                log.debug("ReceiverQueueUsage: cnt={},mean={}, min={},max={},25pct={},50pct={},75pct={}",
+                        qHistogram.getTotalCount(), dec.format(qHistogram.getMean()),
+                        qHistogram.getMinValue(), qHistogram.getMaxValue(),
+                        qHistogram.getValueAtPercentile(25),
+                        qHistogram.getValueAtPercentile(50),
+                        qHistogram.getValueAtPercentile(75)
+                );
+                qHistogram.reset();
+                for (Future<Consumer<ByteBuffer>> future : futures) {
+                    ConsumerBase<?> consumerBase = (ConsumerBase<?>) future.get();
+                    log.debug("[{}] CurrentReceiverQueueSize={}", consumerBase.getConsumerName(),
+                            consumerBase.getCurrentReceiverQueueSize());
+                    if (consumerBase instanceof MultiTopicsConsumerImpl) {
+                        for (ConsumerImpl<?> consumer : ((MultiTopicsConsumerImpl<?>) consumerBase).getConsumers()) {
+                            log.debug("[{}] SubConsumer.CurrentReceiverQueueSize={}", consumer.getConsumerName(),
+                                    consumer.getCurrentReceiverQueueSize());
+                        }
+                    }
+                }
+            }
             if (histogramLogWriter != null) {
                 histogramLogWriter.outputIntervalHistogram(reportHistogram);
             }