You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ji...@apache.org on 2022/05/07 01:59:59 UTC
[pulsar] branch master updated: PIP-74 add support consumer client memory limit (#15216)
This is an automated email from the ASF dual-hosted git repository.
jianghaiting pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 1a098d5418b PIP-74 add support consumer client memory limit (#15216)
1a098d5418b is described below
commit 1a098d5418be7d4674f9ba9dbc4da0018f91654a
Author: JiangHaiting <ji...@apache.org>
AuthorDate: Sat May 7 09:59:52 2022 +0800
PIP-74 add support consumer client memory limit (#15216)
---
.../client/impl/ConsumerMemoryLimitTest.java | 111 +++++++++++++++++++++
.../apache/pulsar/client/impl/ConsumerBase.java | 40 +++++++-
.../apache/pulsar/client/impl/ConsumerImpl.java | 23 ++---
.../pulsar/client/impl/MemoryLimitController.java | 26 +++++
.../client/impl/MultiTopicsConsumerImpl.java | 17 ++--
.../pulsar/client/impl/PulsarClientImpl.java | 11 +-
.../pulsar/client/impl/ZeroQueueConsumerImpl.java | 6 +-
.../client/impl/MemoryLimitControllerTest.java | 39 +++++++-
8 files changed, 245 insertions(+), 28 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerMemoryLimitTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerMemoryLimitTest.java
new file mode 100644
index 00000000000..3c621fb83f1
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerMemoryLimitTest.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.client.impl;
+
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SizeUnit;
+import org.awaitility.Awaitility;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-impl")
+@Slf4j
+public class ConsumerMemoryLimitTest extends ProducerConsumerBase {
+
+ @BeforeMethod
+ @Override
+ protected void setup() throws Exception {
+ super.internalSetup();
+ super.producerBaseSetup();
+ }
+
+ @AfterMethod(alwaysRun = true)
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Test
+ public void testConsumerMemoryLimit() throws Exception {
+ String topic = newTopicName();
+
+ ClientBuilder clientBuilder = PulsarClient.builder()
+ .serviceUrl(pulsar.getBrokerServiceUrl())
+ .memoryLimit(10, SizeUnit.KILO_BYTES);
+
+ @Cleanup
+ PulsarTestClient client = PulsarTestClient.create(clientBuilder);
+
+ @Cleanup
+ ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) client.newProducer().topic(topic).enableBatching(false)
+ .blockIfQueueFull(false)
+ .create();
+
+ @Cleanup
+ ConsumerImpl<byte[]> c1 = (ConsumerImpl<byte[]>) client.newConsumer().subscriptionName("sub").topic(topic)
+ .autoScaledReceiverQueueSizeEnabled(true).subscribe();
+ @Cleanup
+ ConsumerImpl<byte[]> c2 = (ConsumerImpl<byte[]>) client.newConsumer().subscriptionName("sub2").topic(topic)
+ .autoScaledReceiverQueueSizeEnabled(true).subscribe();
+ c2.updateAutoScaleReceiverQueueHint();
+ int n = 5;
+ for (int i = 0; i < n; i++) {
+ producer.send(new byte[3000]);
+ }
+ Awaitility.await().until(c1.scaleReceiverQueueHint::get);
+
+
+ c1.setCurrentReceiverQueueSize(10);
+ Awaitility.await().until(() -> c1.incomingMessages.size() == n);
+ log.info("memory usage:{}", client.getMemoryLimitController().currentUsagePercent());
+
+ //1. check memory limit reached,
+ Assert.assertTrue(client.getMemoryLimitController().currentUsagePercent() > 1);
+
+ //2. check c2 can't expand receiver queue.
+ Assert.assertEquals(c2.getCurrentReceiverQueueSize(), 1);
+ for (int i = 0; i < n; i++) {
+ Awaitility.await().until(() -> c2.incomingMessages.size() == 1);
+ Assert.assertNotNull(c2.receive());
+ }
+ Assert.assertTrue(c2.scaleReceiverQueueHint.get());
+ c2.receiveAsync(); //this should trigger c2 receiver queue size expansion.
+ Awaitility.await().until(() -> !c2.pendingReceives.isEmpty()); //make sure expectMoreIncomingMessages is called.
+ Assert.assertEquals(c2.getCurrentReceiverQueueSize(), 1);
+
+ //3. producer can't send message;
+ Assert.expectThrows(PulsarClientException.MemoryBufferIsFullError.class, () -> producer.send(new byte[10]));
+
+ //4. ConsumerBase#reduceCurrentReceiverQueueSize is called already. Queue size reduced to 5.
+ log.info("RQS:{}", c1.getCurrentReceiverQueueSize());
+ Assert.assertEquals(c1.getCurrentReceiverQueueSize(), 5);
+
+ for (int i = 0; i < n; i++) {
+ c1.receive();
+ }
+ }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index c0d6e2b1258..168b903b1cf 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -27,6 +27,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
@@ -69,6 +70,7 @@ import org.slf4j.LoggerFactory;
public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T> {
protected static final int INITIAL_RECEIVER_QUEUE_SIZE = 1;
+ protected static final double MEMORY_THRESHOLD_FOR_RECEIVER_QUEUE_SIZE_EXPANSION = 0.75;
protected final String subscription;
protected final ConsumerConfigurationData<T> conf;
@@ -166,20 +168,40 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
initReceiverQueueSize();
}
+ public void initReceiverQueueSize() {
+ if (conf.isAutoScaledReceiverQueueSizeEnabled()) {
+ CURRENT_RECEIVER_QUEUE_SIZE_UPDATER.set(this, minReceiverQueueSize());
+ } else {
+ CURRENT_RECEIVER_QUEUE_SIZE_UPDATER.set(this, maxReceiverQueueSize);
+ }
+ }
- public abstract void initReceiverQueueSize();
+ public abstract int minReceiverQueueSize();
protected void expectMoreIncomingMessages() {
if (!conf.isAutoScaledReceiverQueueSizeEnabled()) {
return;
}
- if (scaleReceiverQueueHint.compareAndSet(true, false)) {
+ double usage = getMemoryLimitController().map(MemoryLimitController::currentUsagePercent).orElse(0d);
+ if (usage < MEMORY_THRESHOLD_FOR_RECEIVER_QUEUE_SIZE_EXPANSION
+ && scaleReceiverQueueHint.compareAndSet(true, false)) {
int oldSize = getCurrentReceiverQueueSize();
int newSize = Math.min(maxReceiverQueueSize, oldSize * 2);
setCurrentReceiverQueueSize(newSize);
}
}
+ protected void reduceCurrentReceiverQueueSize() {
+ if (!conf.isAutoScaledReceiverQueueSizeEnabled()) {
+ return;
+ }
+ int oldSize = getCurrentReceiverQueueSize();
+ int newSize = Math.max(minReceiverQueueSize(), oldSize / 2);
+ if (oldSize > newSize) {
+ setCurrentReceiverQueueSize(newSize);
+ }
+ }
+
@Override
public Message<T> receive() throws PulsarClientException {
if (listener != null) {
@@ -795,6 +817,7 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
// After we have enqueued the messages on `incomingMessages` queue, we cannot touch the message instance
// anymore, since for pooled messages, this instance was possibly already been released and recycled.
INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, messageSize);
+ getMemoryLimitController().ifPresent(limiter -> limiter.forceReserveMemory(messageSize));
updateAutoScaleReceiverQueueHint();
}
return hasEnoughMessagesForBatchReceive();
@@ -1055,12 +1078,23 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
return pendingBatchReceives != null && hasNextBatchReceive();
}
+ Optional<MemoryLimitController> getMemoryLimitController() {
+ if (!conf.isAutoScaledReceiverQueueSizeEnabled()) {
+ //disable memory limit.
+ return Optional.empty();
+ } else {
+ return Optional.of(client.getMemoryLimitController());
+ }
+ }
+
protected void resetIncomingMessageSize() {
- INCOMING_MESSAGES_SIZE_UPDATER.getAndSet(this, 0);
+ long oldSize = INCOMING_MESSAGES_SIZE_UPDATER.getAndSet(this, 0);
+ getMemoryLimitController().ifPresent(limiter -> limiter.releaseMemory(oldSize));
}
protected void decreaseIncomingMessageSize(final Message<?> message) {
INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, -message.size());
+ getMemoryLimitController().ifPresent(limiter -> limiter.releaseMemory(message.size()));
}
public long getIncomingMessageSize() {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 2bfe917e623..cb676b1062d 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -417,18 +417,13 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
}
@Override
- public void initReceiverQueueSize() {
- if (conf.isAutoScaledReceiverQueueSizeEnabled()) {
- // turn on autoScaledReceiverQueueSize
- int size = Math.min(INITIAL_RECEIVER_QUEUE_SIZE, maxReceiverQueueSize);
- if (batchReceivePolicy.getMaxNumMessages() > 0) {
- // consumerImpl may store (half-1) permits locally.
- size = Math.max(size, 2 * batchReceivePolicy.getMaxNumMessages() - 2);
- }
- CURRENT_RECEIVER_QUEUE_SIZE_UPDATER.set(this, size);
- } else {
- CURRENT_RECEIVER_QUEUE_SIZE_UPDATER.set(this, maxReceiverQueueSize);
+ public int minReceiverQueueSize() {
+ int size = Math.min(INITIAL_RECEIVER_QUEUE_SIZE, maxReceiverQueueSize);
+ if (batchReceivePolicy.getMaxNumMessages() > 0) {
+ // consumerImpl may store (half-1) permits locally.
+ size = Math.max(size, 2 * batchReceivePolicy.getMaxNumMessages() - 2);
}
+ return size;
}
@Override
@@ -1940,7 +1935,11 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
@Override
protected void updateAutoScaleReceiverQueueHint() {
- scaleReceiverQueueHint.set(getAvailablePermits() + incomingMessages.size() >= getCurrentReceiverQueueSize());
+ boolean prev = scaleReceiverQueueHint.getAndSet(
+ getAvailablePermits() + incomingMessages.size() >= getCurrentReceiverQueueSize());
+ if (log.isDebugEnabled() && prev != scaleReceiverQueueHint.get()) {
+ log.debug("updateAutoScaleReceiverQueueHint {} -> {}", prev, scaleReceiverQueueHint.get());
+ }
}
@Override
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MemoryLimitController.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MemoryLimitController.java
index c3a78e49145..e4d8388a02e 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MemoryLimitController.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MemoryLimitController.java
@@ -25,12 +25,27 @@ import java.util.concurrent.locks.ReentrantLock;
public class MemoryLimitController {
private final long memoryLimit;
+ private final long triggerThreshold;
+ private final Runnable trigger;
private final AtomicLong currentUsage = new AtomicLong();
private final ReentrantLock mutex = new ReentrantLock(false);
private final Condition condition = mutex.newCondition();
public MemoryLimitController(long memoryLimitBytes) {
this.memoryLimit = memoryLimitBytes;
+ triggerThreshold = 0;
+ trigger = null;
+ }
+
+ public MemoryLimitController(long memoryLimitBytes, long triggerThreshold, Runnable trigger) {
+ this.memoryLimit = memoryLimitBytes;
+ this.triggerThreshold = triggerThreshold;
+ this.trigger = trigger;
+ }
+
+ public void forceReserveMemory(long size) {
+ long newUsage = currentUsage.addAndGet(size);
+ checkTrigger(newUsage - size, newUsage);
}
public boolean tryReserveMemory(long size) {
@@ -45,11 +60,18 @@ public class MemoryLimitController {
}
if (currentUsage.compareAndSet(current, newUsage)) {
+ checkTrigger(current, newUsage);
return true;
}
}
}
+ private void checkTrigger(long prevUsage, long newUsage) {
+ if (newUsage >= triggerThreshold && prevUsage < triggerThreshold && trigger != null) {
+ trigger.run();
+ }
+ }
+
public void reserveMemory(long size) throws InterruptedException {
if (!tryReserveMemory(size)) {
mutex.lock();
@@ -80,4 +102,8 @@ public class MemoryLimitController {
public long currentUsage() {
return currentUsage.get();
}
+
+ public double currentUsagePercent() {
+ return 1.0 * currentUsage.get() / memoryLimit;
+ }
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 54cef152013..28d1f6cc7a3 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -338,16 +338,15 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
}
@Override
- public void initReceiverQueueSize() {
- if (conf.isAutoScaledReceiverQueueSizeEnabled()) {
- int size = Math.min(INITIAL_RECEIVER_QUEUE_SIZE, maxReceiverQueueSize);
- if (batchReceivePolicy.getMaxNumMessages() > 0) {
- size = Math.max(size, batchReceivePolicy.getMaxNumMessages());
- }
- CURRENT_RECEIVER_QUEUE_SIZE_UPDATER.set(this, size);
- } else {
- CURRENT_RECEIVER_QUEUE_SIZE_UPDATER.set(this, maxReceiverQueueSize);
+ public int minReceiverQueueSize() {
+ int size = Math.min(INITIAL_RECEIVER_QUEUE_SIZE, maxReceiverQueueSize);
+ if (batchReceivePolicy.getMaxNumMessages() > 0) {
+ size = Math.max(size, batchReceivePolicy.getMaxNumMessages());
+ }
+ if (allTopicPartitionsNumber != null) {
+ size = Math.max(allTopicPartitionsNumber.get(), size);
}
+ return size;
}
@Override
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 890ea655059..e00f1f2e97c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -93,6 +93,7 @@ public class PulsarClientImpl implements PulsarClient {
private static final Logger log = LoggerFactory.getLogger(PulsarClientImpl.class);
private static final int CLOSE_TIMEOUT_SECONDS = 60;
+ private static final double THRESHOLD_FOR_CONSUMER_RECEIVER_QUEUE_SIZE_SHRINKING = 0.95;
protected final ClientConfigurationData conf;
private final boolean createdExecutorProviders;
@@ -211,7 +212,9 @@ public class PulsarClientImpl implements PulsarClient {
}
}
- memoryLimitController = new MemoryLimitController(conf.getMemoryLimitBytes());
+ memoryLimitController = new MemoryLimitController(conf.getMemoryLimitBytes(),
+ (long) (conf.getMemoryLimitBytes() * THRESHOLD_FOR_CONSUMER_RECEIVER_QUEUE_SIZE_SHRINKING),
+ this::reduceConsumerReceiverQueueSize);
state.set(State.Open);
} catch (Throwable t) {
shutdown();
@@ -221,6 +224,12 @@ public class PulsarClientImpl implements PulsarClient {
}
}
+ private void reduceConsumerReceiverQueueSize() {
+ for (ConsumerBase<?> consumer : consumers) {
+ consumer.reduceCurrentReceiverQueueSize();
+ }
+ }
+
private void setAuth(ClientConfigurationData conf) throws PulsarClientException {
if (StringUtils.isBlank(conf.getAuthPluginClassName())
|| (StringUtils.isBlank(conf.getAuthParams()) && conf.getAuthParamMap() == null)) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
index df7ca4789fd..b56b97a7acd 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
@@ -56,6 +56,11 @@ public class ZeroQueueConsumerImpl<T> extends ConsumerImpl<T> {
createTopicIfDoesNotExist);
}
+ @Override
+ public int minReceiverQueueSize() {
+ return 0;
+ }
+
@Override
public void initReceiverQueueSize() {
if (conf.isAutoScaledReceiverQueueSizeEnabled()) {
@@ -63,7 +68,6 @@ public class ZeroQueueConsumerImpl<T> extends ConsumerImpl<T> {
} else {
CURRENT_RECEIVER_QUEUE_SIZE_UPDATER.set(this, 0);
}
-
}
@Override
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MemoryLimitControllerTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MemoryLimitControllerTest.java
index a21f1cf2979..c94a8365eef 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MemoryLimitControllerTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MemoryLimitControllerTest.java
@@ -21,12 +21,12 @@ package org.apache.pulsar.client.impl;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
-
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
-
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -54,12 +54,47 @@ public class MemoryLimitControllerTest {
}
assertEquals(mlc.currentUsage(), 101);
+ assertEquals(mlc.currentUsagePercent(), 1.01);
assertFalse(mlc.tryReserveMemory(1));
mlc.releaseMemory(1);
assertEquals(mlc.currentUsage(), 100);
+ assertEquals(mlc.currentUsagePercent(), 1.0);
assertTrue(mlc.tryReserveMemory(1));
assertEquals(mlc.currentUsage(), 101);
+
+ mlc.forceReserveMemory(99);
+ assertFalse(mlc.tryReserveMemory(1));
+ assertEquals(mlc.currentUsagePercent(), 2.0);
+
+ mlc.releaseMemory(50);
+ assertFalse(mlc.tryReserveMemory(1));
+ assertEquals(mlc.currentUsagePercent(), 1.5);
+
+ mlc.releaseMemory(50);
+ assertTrue(mlc.tryReserveMemory(1));
+ assertEquals(mlc.currentUsagePercent(), 1.01);
+ }
+
+ @Test
+ public void testTrigger() {
+ AtomicBoolean trigger = new AtomicBoolean(false);
+ MemoryLimitController mlc = new MemoryLimitController(100, 95, () -> trigger.set(true));
+
+ mlc.forceReserveMemory(94);
+ Assert.assertFalse(trigger.get());
+ mlc.forceReserveMemory(1);
+ Assert.assertTrue(trigger.get());
+
+ trigger.set(false);
+ for (int i = 0; i < 5; i++) {
+ mlc.forceReserveMemory(1);
+ Assert.assertFalse(trigger.get());
+ }
+
+ mlc.releaseMemory(50);
+ Assert.assertTrue(mlc.tryReserveMemory(50));
+ Assert.assertTrue(trigger.get());
}
@Test