You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ji...@apache.org on 2022/05/07 01:59:59 UTC

[pulsar] branch master updated: PIP-74 add support consumer client memory limit (#15216)

This is an automated email from the ASF dual-hosted git repository.

jianghaiting pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1a098d5418b PIP-74 add support consumer client memory limit (#15216)
1a098d5418b is described below

commit 1a098d5418be7d4674f9ba9dbc4da0018f91654a
Author: JiangHaiting <ji...@apache.org>
AuthorDate: Sat May 7 09:59:52 2022 +0800

    PIP-74 add support consumer client memory limit (#15216)
---
 .../client/impl/ConsumerMemoryLimitTest.java       | 111 +++++++++++++++++++++
 .../apache/pulsar/client/impl/ConsumerBase.java    |  40 +++++++-
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  23 ++---
 .../pulsar/client/impl/MemoryLimitController.java  |  26 +++++
 .../client/impl/MultiTopicsConsumerImpl.java       |  17 ++--
 .../pulsar/client/impl/PulsarClientImpl.java       |  11 +-
 .../pulsar/client/impl/ZeroQueueConsumerImpl.java  |   6 +-
 .../client/impl/MemoryLimitControllerTest.java     |  39 +++++++-
 8 files changed, 245 insertions(+), 28 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerMemoryLimitTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerMemoryLimitTest.java
new file mode 100644
index 00000000000..3c621fb83f1
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerMemoryLimitTest.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.client.impl;
+
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SizeUnit;
+import org.awaitility.Awaitility;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-impl")
+@Slf4j
+public class ConsumerMemoryLimitTest extends ProducerConsumerBase {
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testConsumerMemoryLimit() throws Exception {
+        String topic = newTopicName();
+
+        ClientBuilder clientBuilder = PulsarClient.builder()
+                .serviceUrl(pulsar.getBrokerServiceUrl())
+                .memoryLimit(10, SizeUnit.KILO_BYTES);
+
+        @Cleanup
+        PulsarTestClient client = PulsarTestClient.create(clientBuilder);
+
+        @Cleanup
+        ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) client.newProducer().topic(topic).enableBatching(false)
+                .blockIfQueueFull(false)
+                .create();
+
+        @Cleanup
+        ConsumerImpl<byte[]> c1 = (ConsumerImpl<byte[]>) client.newConsumer().subscriptionName("sub").topic(topic)
+                .autoScaledReceiverQueueSizeEnabled(true).subscribe();
+        @Cleanup
+        ConsumerImpl<byte[]> c2 = (ConsumerImpl<byte[]>) client.newConsumer().subscriptionName("sub2").topic(topic)
+                .autoScaledReceiverQueueSizeEnabled(true).subscribe();
+        c2.updateAutoScaleReceiverQueueHint();
+        int n = 5;
+        for (int i = 0; i < n; i++) {
+            producer.send(new byte[3000]);
+        }
+        Awaitility.await().until(c1.scaleReceiverQueueHint::get);
+
+
+        c1.setCurrentReceiverQueueSize(10);
+        Awaitility.await().until(() -> c1.incomingMessages.size() == n);
+        log.info("memory usage:{}", client.getMemoryLimitController().currentUsagePercent());
+
+        //1. check memory limit reached,
+        Assert.assertTrue(client.getMemoryLimitController().currentUsagePercent() > 1);
+
+        //2. check c2 can't expand receiver queue.
+        Assert.assertEquals(c2.getCurrentReceiverQueueSize(), 1);
+        for (int i = 0; i < n; i++) {
+            Awaitility.await().until(() -> c2.incomingMessages.size() == 1);
+            Assert.assertNotNull(c2.receive());
+        }
+        Assert.assertTrue(c2.scaleReceiverQueueHint.get());
+        c2.receiveAsync(); //this should trigger c2 receiver queue size expansion.
+        Awaitility.await().until(() -> !c2.pendingReceives.isEmpty()); //make sure expectMoreIncomingMessages is called.
+        Assert.assertEquals(c2.getCurrentReceiverQueueSize(), 1);
+
+        //3. producer can't send message;
+        Assert.expectThrows(PulsarClientException.MemoryBufferIsFullError.class, () -> producer.send(new byte[10]));
+
+        //4. ConsumerBase#reduceCurrentReceiverQueueSize is called already. Queue size reduced to 5.
+        log.info("RQS:{}", c1.getCurrentReceiverQueueSize());
+        Assert.assertEquals(c1.getCurrentReceiverQueueSize(), 5);
+
+        for (int i = 0; i < n; i++) {
+            c1.receive();
+        }
+    }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index c0d6e2b1258..168b903b1cf 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -27,6 +27,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
@@ -69,6 +70,7 @@ import org.slf4j.LoggerFactory;
 
 public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T> {
     protected static final int INITIAL_RECEIVER_QUEUE_SIZE = 1;
+    protected static final double MEMORY_THRESHOLD_FOR_RECEIVER_QUEUE_SIZE_EXPANSION = 0.75;
 
     protected final String subscription;
     protected final ConsumerConfigurationData<T> conf;
@@ -166,20 +168,40 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
         initReceiverQueueSize();
     }
 
+    public void initReceiverQueueSize() {
+        if (conf.isAutoScaledReceiverQueueSizeEnabled()) {
+            CURRENT_RECEIVER_QUEUE_SIZE_UPDATER.set(this, minReceiverQueueSize());
+        } else {
+            CURRENT_RECEIVER_QUEUE_SIZE_UPDATER.set(this, maxReceiverQueueSize);
+        }
+    }
 
-    public abstract void initReceiverQueueSize();
+    public abstract int minReceiverQueueSize();
 
     protected void expectMoreIncomingMessages() {
         if (!conf.isAutoScaledReceiverQueueSizeEnabled()) {
             return;
         }
-        if (scaleReceiverQueueHint.compareAndSet(true, false)) {
+        double usage = getMemoryLimitController().map(MemoryLimitController::currentUsagePercent).orElse(0d);
+        if (usage < MEMORY_THRESHOLD_FOR_RECEIVER_QUEUE_SIZE_EXPANSION
+                 && scaleReceiverQueueHint.compareAndSet(true, false)) {
             int oldSize = getCurrentReceiverQueueSize();
             int newSize = Math.min(maxReceiverQueueSize, oldSize * 2);
             setCurrentReceiverQueueSize(newSize);
         }
     }
 
+    protected void reduceCurrentReceiverQueueSize() {
+        if (!conf.isAutoScaledReceiverQueueSizeEnabled()) {
+            return;
+        }
+        int oldSize = getCurrentReceiverQueueSize();
+        int newSize = Math.max(minReceiverQueueSize(), oldSize / 2);
+        if (oldSize > newSize) {
+            setCurrentReceiverQueueSize(newSize);
+        }
+    }
+
     @Override
     public Message<T> receive() throws PulsarClientException {
         if (listener != null) {
@@ -795,6 +817,7 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
             // After we have enqueued the messages on `incomingMessages` queue, we cannot touch the message instance
             // anymore, since for pooled messages, this instance was possibly already been released and recycled.
             INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, messageSize);
+            getMemoryLimitController().ifPresent(limiter -> limiter.forceReserveMemory(messageSize));
             updateAutoScaleReceiverQueueHint();
         }
         return hasEnoughMessagesForBatchReceive();
@@ -1055,12 +1078,23 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
         return pendingBatchReceives != null && hasNextBatchReceive();
     }
 
+    Optional<MemoryLimitController> getMemoryLimitController() {
+        if (!conf.isAutoScaledReceiverQueueSizeEnabled()) {
+            //disable memory limit.
+            return Optional.empty();
+        } else {
+            return Optional.of(client.getMemoryLimitController());
+        }
+    }
+
     protected void resetIncomingMessageSize() {
-        INCOMING_MESSAGES_SIZE_UPDATER.getAndSet(this, 0);
+        long oldSize = INCOMING_MESSAGES_SIZE_UPDATER.getAndSet(this, 0);
+        getMemoryLimitController().ifPresent(limiter -> limiter.releaseMemory(oldSize));
     }
 
     protected void decreaseIncomingMessageSize(final Message<?> message) {
         INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, -message.size());
+        getMemoryLimitController().ifPresent(limiter -> limiter.releaseMemory(message.size()));
     }
 
     public long getIncomingMessageSize() {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 2bfe917e623..cb676b1062d 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -417,18 +417,13 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
     }
 
     @Override
-    public void initReceiverQueueSize() {
-        if (conf.isAutoScaledReceiverQueueSizeEnabled()) {
-            // turn on autoScaledReceiverQueueSize
-            int size = Math.min(INITIAL_RECEIVER_QUEUE_SIZE, maxReceiverQueueSize);
-            if (batchReceivePolicy.getMaxNumMessages() > 0) {
-                // consumerImpl may store (half-1) permits locally.
-                size = Math.max(size, 2 * batchReceivePolicy.getMaxNumMessages() - 2);
-            }
-            CURRENT_RECEIVER_QUEUE_SIZE_UPDATER.set(this, size);
-        } else {
-            CURRENT_RECEIVER_QUEUE_SIZE_UPDATER.set(this, maxReceiverQueueSize);
+    public int minReceiverQueueSize() {
+        int size = Math.min(INITIAL_RECEIVER_QUEUE_SIZE, maxReceiverQueueSize);
+        if (batchReceivePolicy.getMaxNumMessages() > 0) {
+            // consumerImpl may store (half-1) permits locally.
+            size = Math.max(size, 2 * batchReceivePolicy.getMaxNumMessages() - 2);
         }
+        return size;
     }
 
     @Override
@@ -1940,7 +1935,11 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
 
     @Override
     protected void updateAutoScaleReceiverQueueHint() {
-        scaleReceiverQueueHint.set(getAvailablePermits() + incomingMessages.size() >= getCurrentReceiverQueueSize());
+        boolean prev = scaleReceiverQueueHint.getAndSet(
+                getAvailablePermits() + incomingMessages.size() >= getCurrentReceiverQueueSize());
+        if (log.isDebugEnabled() && prev != scaleReceiverQueueHint.get()) {
+            log.debug("updateAutoScaleReceiverQueueHint {} -> {}", prev, scaleReceiverQueueHint.get());
+        }
     }
 
     @Override
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MemoryLimitController.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MemoryLimitController.java
index c3a78e49145..e4d8388a02e 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MemoryLimitController.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MemoryLimitController.java
@@ -25,12 +25,27 @@ import java.util.concurrent.locks.ReentrantLock;
 public class MemoryLimitController {
 
     private final long memoryLimit;
+    private final long triggerThreshold;
+    private final Runnable trigger;
     private final AtomicLong currentUsage = new AtomicLong();
     private final ReentrantLock mutex = new ReentrantLock(false);
     private final Condition condition = mutex.newCondition();
 
     public MemoryLimitController(long memoryLimitBytes) {
         this.memoryLimit = memoryLimitBytes;
+        triggerThreshold = 0;
+        trigger = null;
+    }
+
+    public MemoryLimitController(long memoryLimitBytes, long triggerThreshold, Runnable trigger) {
+        this.memoryLimit = memoryLimitBytes;
+        this.triggerThreshold = triggerThreshold;
+        this.trigger = trigger;
+    }
+
+    public void forceReserveMemory(long size) {
+        long newUsage = currentUsage.addAndGet(size);
+        checkTrigger(newUsage - size, newUsage);
     }
 
     public boolean tryReserveMemory(long size) {
@@ -45,11 +60,18 @@ public class MemoryLimitController {
             }
 
             if (currentUsage.compareAndSet(current, newUsage)) {
+                checkTrigger(current, newUsage);
                 return true;
             }
         }
     }
 
+    private void checkTrigger(long prevUsage, long newUsage) {
+        if (newUsage >= triggerThreshold && prevUsage < triggerThreshold && trigger != null) {
+            trigger.run();
+        }
+    }
+
     public void reserveMemory(long size) throws InterruptedException {
         if (!tryReserveMemory(size)) {
             mutex.lock();
@@ -80,4 +102,8 @@ public class MemoryLimitController {
     public long currentUsage() {
         return currentUsage.get();
     }
+
+    public double currentUsagePercent() {
+        return 1.0 * currentUsage.get() / memoryLimit;
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 54cef152013..28d1f6cc7a3 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -338,16 +338,15 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
     }
 
     @Override
-    public void initReceiverQueueSize() {
-        if (conf.isAutoScaledReceiverQueueSizeEnabled()) {
-            int size = Math.min(INITIAL_RECEIVER_QUEUE_SIZE, maxReceiverQueueSize);
-            if (batchReceivePolicy.getMaxNumMessages() > 0) {
-                size = Math.max(size, batchReceivePolicy.getMaxNumMessages());
-            }
-            CURRENT_RECEIVER_QUEUE_SIZE_UPDATER.set(this, size);
-        } else {
-            CURRENT_RECEIVER_QUEUE_SIZE_UPDATER.set(this, maxReceiverQueueSize);
+    public int minReceiverQueueSize() {
+        int size = Math.min(INITIAL_RECEIVER_QUEUE_SIZE, maxReceiverQueueSize);
+        if (batchReceivePolicy.getMaxNumMessages() > 0) {
+            size = Math.max(size, batchReceivePolicy.getMaxNumMessages());
+        }
+        if (allTopicPartitionsNumber != null) {
+            size = Math.max(allTopicPartitionsNumber.get(), size);
         }
+        return size;
     }
 
     @Override
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 890ea655059..e00f1f2e97c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -93,6 +93,7 @@ public class PulsarClientImpl implements PulsarClient {
 
     private static final Logger log = LoggerFactory.getLogger(PulsarClientImpl.class);
     private static final int CLOSE_TIMEOUT_SECONDS = 60;
+    private static final double THRESHOLD_FOR_CONSUMER_RECEIVER_QUEUE_SIZE_SHRINKING = 0.95;
 
     protected final ClientConfigurationData conf;
     private final boolean createdExecutorProviders;
@@ -211,7 +212,9 @@ public class PulsarClientImpl implements PulsarClient {
                 }
             }
 
-            memoryLimitController = new MemoryLimitController(conf.getMemoryLimitBytes());
+            memoryLimitController = new MemoryLimitController(conf.getMemoryLimitBytes(),
+                    (long) (conf.getMemoryLimitBytes() * THRESHOLD_FOR_CONSUMER_RECEIVER_QUEUE_SIZE_SHRINKING),
+                    this::reduceConsumerReceiverQueueSize);
             state.set(State.Open);
         } catch (Throwable t) {
             shutdown();
@@ -221,6 +224,12 @@ public class PulsarClientImpl implements PulsarClient {
         }
     }
 
+    private void reduceConsumerReceiverQueueSize() {
+        for (ConsumerBase<?> consumer : consumers) {
+            consumer.reduceCurrentReceiverQueueSize();
+        }
+    }
+
     private void setAuth(ClientConfigurationData conf) throws PulsarClientException {
         if (StringUtils.isBlank(conf.getAuthPluginClassName())
                 || (StringUtils.isBlank(conf.getAuthParams()) && conf.getAuthParamMap() == null)) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
index df7ca4789fd..b56b97a7acd 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
@@ -56,6 +56,11 @@ public class ZeroQueueConsumerImpl<T> extends ConsumerImpl<T> {
                 createTopicIfDoesNotExist);
     }
 
+    @Override
+    public int minReceiverQueueSize() {
+        return 0;
+    }
+
     @Override
     public void initReceiverQueueSize() {
         if (conf.isAutoScaledReceiverQueueSizeEnabled()) {
@@ -63,7 +68,6 @@ public class ZeroQueueConsumerImpl<T> extends ConsumerImpl<T> {
         } else {
             CURRENT_RECEIVER_QUEUE_SIZE_UPDATER.set(this, 0);
         }
-
     }
 
     @Override
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MemoryLimitControllerTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MemoryLimitControllerTest.java
index a21f1cf2979..c94a8365eef 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MemoryLimitControllerTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MemoryLimitControllerTest.java
@@ -21,12 +21,12 @@ package org.apache.pulsar.client.impl;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
-
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
-
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -54,12 +54,47 @@ public class MemoryLimitControllerTest {
         }
 
         assertEquals(mlc.currentUsage(), 101);
+        assertEquals(mlc.currentUsagePercent(), 1.01);
         assertFalse(mlc.tryReserveMemory(1));
         mlc.releaseMemory(1);
         assertEquals(mlc.currentUsage(), 100);
+        assertEquals(mlc.currentUsagePercent(), 1.0);
 
         assertTrue(mlc.tryReserveMemory(1));
         assertEquals(mlc.currentUsage(), 101);
+
+        mlc.forceReserveMemory(99);
+        assertFalse(mlc.tryReserveMemory(1));
+        assertEquals(mlc.currentUsagePercent(), 2.0);
+
+        mlc.releaseMemory(50);
+        assertFalse(mlc.tryReserveMemory(1));
+        assertEquals(mlc.currentUsagePercent(), 1.5);
+
+        mlc.releaseMemory(50);
+        assertTrue(mlc.tryReserveMemory(1));
+        assertEquals(mlc.currentUsagePercent(), 1.01);
+    }
+
+    @Test
+    public void testTrigger() {
+        AtomicBoolean trigger = new AtomicBoolean(false);
+        MemoryLimitController mlc = new MemoryLimitController(100, 95, () -> trigger.set(true));
+
+        mlc.forceReserveMemory(94);
+        Assert.assertFalse(trigger.get());
+        mlc.forceReserveMemory(1);
+        Assert.assertTrue(trigger.get());
+
+        trigger.set(false);
+        for (int i = 0; i < 5; i++) {
+            mlc.forceReserveMemory(1);
+            Assert.assertFalse(trigger.get());
+        }
+
+        mlc.releaseMemory(50);
+        Assert.assertTrue(mlc.tryReserveMemory(50));
+        Assert.assertTrue(trigger.get());
     }
 
     @Test