You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/05/02 17:45:49 UTC
[pulsar] branch master updated: Improved in max-pending-bytes
mechanism for broker (#7406)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 2a522c8 Improved in max-pending-bytes mechanism for broker (#7406)
2a522c8 is described below
commit 2a522c85c0add3f1f647befb0a560689324114bd
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Sun May 2 10:44:54 2021 -0700
Improved in max-pending-bytes mechanism for broker (#7406)
* Improved in max-pending-bytes mechanism for broker
* Fixed imports
* Switched to LongAdder
---
conf/broker.conf | 4 -
.../apache/pulsar/broker/ServiceConfiguration.java | 1 +
.../pulsar/broker/service/AbstractTopic.java | 8 +-
.../pulsar/broker/service/BrokerService.java | 63 +++-------
.../apache/pulsar/broker/service/ServerCnx.java | 127 ++++++++++++++-------
.../apache/pulsar/broker/service/TransportCnx.java | 2 -
.../service/MessagePublishBufferThrottleTest.java | 126 ++++++++++----------
.../bookkeeper/client/PulsarMockBookKeeper.java | 26 ++++-
.../bookkeeper/client/PulsarMockLedgerHandle.java | 7 +-
9 files changed, 190 insertions(+), 174 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index 8238389..ce94927 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -474,10 +474,6 @@ replicatedSubscriptionsSnapshotMaxCachedPerSubscription=10
# Use -1 to disable the memory limitation. Default is 1/2 of direct memory.
maxMessagePublishBufferSizeInMB=
-# Interval between checks to see if message publish buffer size is exceed the max message publish buffer size
-# Use 0 or negative number to disable the max publish buffer limiting.
-messagePublishBufferCheckIntervalInMillis=100
-
# Check between intervals to see if consumed ledgers need to be trimmed
# Use 0 or negative number to disable the check
retentionCheckIntervalInSeconds=120
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index a993402..0f08d1a 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -414,6 +414,7 @@ public class ServiceConfiguration implements PulsarConfiguration {
+ "requests in memory. Default: 1000"
)
private int maxPendingPublishRequestsPerConnection = 1000;
+
@FieldContext(
category = CATEGORY_POLICIES,
doc = "How frequently to proactively check and purge expired messages"
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index cf07578..23178b3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -270,16 +270,12 @@ public abstract class AbstractTopic implements Topic {
@Override
public void disableCnxAutoRead() {
- if (producers != null) {
- producers.values().forEach(producer -> producer.getCnx().disableCnxAutoRead());
- }
+ producers.values().forEach(producer -> producer.getCnx().disableCnxAutoRead());
}
@Override
public void enableCnxAutoRead() {
- if (producers != null) {
- producers.values().forEach(producer -> producer.getCnx().enableCnxAutoRead());
- }
+ producers.values().forEach(producer -> producer.getCnx().enableCnxAutoRead());
}
protected boolean hasLocalProducers() {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index e43385b..1b855f8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -26,7 +26,6 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import static org.apache.pulsar.broker.cache.LocalZooKeeperCacheService.LOCAL_POLICIES_ROOT;
import static org.apache.pulsar.broker.web.PulsarWebResource.joinPath;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
@@ -67,7 +66,6 @@ import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReadWriteLock;
@@ -232,7 +230,6 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
private final ScheduledExecutorService inactivityMonitor;
private final ScheduledExecutorService messageExpiryMonitor;
private final ScheduledExecutorService compactionMonitor;
- private final ScheduledExecutorService messagePublishBufferMonitor;
private final ScheduledExecutorService consumedLedgersMonitor;
private ScheduledExecutorService topicPublishRateLimiterMonitor;
private ScheduledExecutorService brokerPublishRateLimiterMonitor;
@@ -269,18 +266,13 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
private Channel listenChannelTls;
private boolean preciseTopicPublishRateLimitingEnable;
- private final long maxMessagePublishBufferBytes;
- private final long resumeProducerReadMessagePublishBufferBytes;
- private volatile boolean reachMessagePublishBufferThreshold;
+ private final LongAdder pausedConnections = new LongAdder();
private BrokerInterceptor interceptor;
private Set<BrokerEntryMetadataInterceptor> brokerEntryMetadataInterceptors;
public BrokerService(PulsarService pulsar) throws Exception {
this.pulsar = pulsar;
- this.maxMessagePublishBufferBytes = pulsar.getConfiguration().getMaxMessagePublishBufferSizeInMB() > 0
- ? pulsar.getConfiguration().getMaxMessagePublishBufferSizeInMB() * 1024L * 1024L : -1;
- this.resumeProducerReadMessagePublishBufferBytes = this.maxMessagePublishBufferBytes / 2;
this.preciseTopicPublishRateLimitingEnable =
pulsar.getConfiguration().isPreciseTopicPublishRateLimiterEnable();
this.managedLedgerFactory = pulsar.getManagedLedgerFactory();
@@ -322,9 +314,6 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
this.compactionMonitor =
Executors.newSingleThreadScheduledExecutor(
new DefaultThreadFactory("pulsar-compaction-monitor"));
- this.messagePublishBufferMonitor =
- Executors.newSingleThreadScheduledExecutor(
- new DefaultThreadFactory("pulsar-publish-buffer-monitor"));
this.consumedLedgersMonitor = Executors
.newSingleThreadScheduledExecutor(new DefaultThreadFactory("consumed-Ledgers-monitor"));
@@ -474,7 +463,6 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
this.startInactivityMonitor();
this.startMessageExpiryMonitor();
this.startCompactionMonitor();
- this.startMessagePublishBufferMonitor();
this.startConsumedLedgersMonitor();
this.startBacklogQuotaChecker();
this.updateBrokerPublisherThrottlingMaxRate();
@@ -552,14 +540,6 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
}
}
- protected void startMessagePublishBufferMonitor() {
- int interval = pulsar().getConfiguration().getMessagePublishBufferCheckIntervalInMillis();
- if (interval > 0 && maxMessagePublishBufferBytes > 0) {
- messagePublishBufferMonitor.scheduleAtFixedRate(safeRun(this::checkMessagePublishBuffer),
- interval, interval, TimeUnit.MILLISECONDS);
- }
- }
-
protected void startConsumedLedgersMonitor() {
int interval = pulsar().getConfiguration().getRetentionCheckIntervalInSeconds();
if (interval > 0) {
@@ -757,7 +737,6 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
inactivityMonitor,
messageExpiryMonitor,
compactionMonitor,
- messagePublishBufferMonitor,
consumedLedgersMonitor,
backlogQuotaChecker,
topicOrderedExecutor,
@@ -2487,22 +2466,6 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
}
}
- @VisibleForTesting
- void checkMessagePublishBuffer() {
- AtomicLong currentMessagePublishBufferBytes = new AtomicLong();
- foreachCnx(cnx -> currentMessagePublishBufferBytes.addAndGet(cnx.getMessagePublishBufferSize()));
- if (currentMessagePublishBufferBytes.get() >= maxMessagePublishBufferBytes
- && !reachMessagePublishBufferThreshold) {
- reachMessagePublishBufferThreshold = true;
- forEachTopic(topic -> ((AbstractTopic) topic).disableProducerRead());
- }
- if (currentMessagePublishBufferBytes.get() < resumeProducerReadMessagePublishBufferBytes
- && reachMessagePublishBufferThreshold) {
- reachMessagePublishBufferThreshold = false;
- forEachTopic(topic -> ((AbstractTopic) topic).enableProducerReadForPublishBufferLimiting());
- }
- }
-
private void foreachCnx(Consumer<TransportCnx> consumer) {
Set<TransportCnx> cnxSet = new HashSet<>();
topics.forEach((n, t) -> {
@@ -2512,17 +2475,6 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
cnxSet.forEach(consumer);
}
- public boolean isReachMessagePublishBufferThreshold() {
- return reachMessagePublishBufferThreshold;
- }
-
- @VisibleForTesting
- long getCurrentMessagePublishBufferSize() {
- AtomicLong currentMessagePublishBufferBytes = new AtomicLong();
- foreachCnx(cnx -> currentMessagePublishBufferBytes.addAndGet(cnx.getMessagePublishBufferSize()));
- return currentMessagePublishBufferBytes.get();
- }
-
public boolean isAllowAutoTopicCreation(final String topic) {
TopicName topicName = TopicName.get(topic);
return isAllowAutoTopicCreation(topicName);
@@ -2691,7 +2643,18 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
}
public boolean isBrokerEntryMetadataEnabled() {
- return brokerEntryMetadataInterceptors.size() > 0;
+ return !brokerEntryMetadataInterceptors.isEmpty();
}
+ public void pausedConnections(int numberOfConnections) {
+ pausedConnections.add(numberOfConnections);
+ }
+
+ public void resumedConnections(int numberOfConnections) {
+ pausedConnections.add(-numberOfConnections);
+ }
+
+ public long getPausedConnections() {
+ return pausedConnections.longValue();
+ }
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 09257f0..40a5948 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -32,10 +32,13 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOption;
import io.netty.handler.codec.haproxy.HAProxyMessage;
import io.netty.handler.ssl.SslHandler;
+import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.concurrent.Promise;
import io.prometheus.client.Gauge;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
+import java.util.Collections;
+import java.util.IdentityHashMap;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
@@ -43,7 +46,6 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.stream.Collectors;
import javax.naming.AuthenticationException;
import javax.net.ssl.SSLSession;
@@ -56,7 +58,10 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.util.SafeRun;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.commons.lang3.mutable.MutableLong;
import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.TransactionMetadataStoreService;
import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
@@ -182,16 +187,33 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
// Flag to manage throttling-rate by atomically enable/disable read-channel.
private volatile boolean autoReadDisabledRateLimiting = false;
private FeatureFlags features;
- // Flag to manage throttling-publish-buffer by atomically enable/disable read-channel.
- private volatile boolean autoReadDisabledPublishBufferLimiting = false;
- private static final AtomicLongFieldUpdater<ServerCnx> MSG_PUBLISH_BUFFER_SIZE_UPDATER =
- AtomicLongFieldUpdater.newUpdater(ServerCnx.class, "messagePublishBufferSize");
- private volatile long messagePublishBufferSize = 0;
+
private PulsarCommandSender commandSender;
private static final KeySharedMeta emptyKeySharedMeta = new KeySharedMeta()
.setKeySharedMode(KeySharedMode.AUTO_SPLIT);
+ // Flag to manage throttling-publish-buffer by atomically enable/disable read-channel.
+ private boolean autoReadDisabledPublishBufferLimiting = false;
+ private final long maxPendingBytesPerThread;
+ private final long resumeThresholdPendingBytesPerThread;
+
+ // Number of bytes pending to be published from a single specific IO thread.
+ private static final FastThreadLocal<MutableLong> pendingBytesPerThread = new FastThreadLocal<MutableLong>() {
+ @Override
+ protected MutableLong initialValue() throws Exception {
+ return new MutableLong();
+ }
+ };
+
+ // A set of connections tied to the current thread
+ private static final FastThreadLocal<Set<ServerCnx>> cnxsPerThread = new FastThreadLocal<Set<ServerCnx>>() {
+ @Override
+ protected Set<ServerCnx> initialValue() throws Exception {
+ return Collections.newSetFromMap(new IdentityHashMap<>());
+ }
+ };
+
enum State {
Start, Connected, Failed, Connecting
}
@@ -201,22 +223,26 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
this.service = pulsar.getBrokerService();
this.schemaService = pulsar.getSchemaRegistryService();
this.state = State.Start;
+ ServiceConfiguration conf = pulsar.getConfiguration();
// This maps are not heavily contended since most accesses are within the cnx thread
this.producers = new ConcurrentLongHashMap<>(8, 1);
this.consumers = new ConcurrentLongHashMap<>(8, 1);
- this.replicatorPrefix = service.pulsar().getConfiguration().getReplicatorPrefix();
- this.maxNonPersistentPendingMessages = service.pulsar().getConfiguration()
- .getMaxConcurrentNonPersistentMessagePerConnection();
- this.proxyRoles = service.pulsar().getConfiguration().getProxyRoles();
- this.authenticateOriginalAuthData = service.pulsar().getConfiguration().isAuthenticateOriginalAuthData();
- this.schemaValidationEnforced = pulsar.getConfiguration().isSchemaValidationEnforced();
- this.maxMessageSize = pulsar.getConfiguration().getMaxMessageSize();
- this.maxPendingSendRequests = pulsar.getConfiguration().getMaxPendingPublishRequestsPerConnection();
+ this.replicatorPrefix = conf.getReplicatorPrefix();
+ this.maxNonPersistentPendingMessages = conf.getMaxConcurrentNonPersistentMessagePerConnection();
+ this.proxyRoles = conf.getProxyRoles();
+ this.authenticateOriginalAuthData = conf.isAuthenticateOriginalAuthData();
+ this.schemaValidationEnforced = conf.isSchemaValidationEnforced();
+ this.maxMessageSize = conf.getMaxMessageSize();
+ this.maxPendingSendRequests = conf.getMaxPendingPublishRequestsPerConnection();
this.resumeReadsThreshold = maxPendingSendRequests / 2;
- this.preciseDispatcherFlowControl = pulsar.getConfiguration().isPreciseDispatcherFlowControl();
- this.preciseTopicPublishRateLimitingEnable = pulsar.getConfiguration().isPreciseTopicPublishRateLimiterEnable();
- this.encryptionRequireOnProducer = pulsar.getConfiguration().isEncryptionRequireOnProducer();
+ this.preciseDispatcherFlowControl = conf.isPreciseDispatcherFlowControl();
+ this.preciseTopicPublishRateLimitingEnable = conf.isPreciseTopicPublishRateLimiterEnable();
+ this.encryptionRequireOnProducer = conf.isEncryptionRequireOnProducer();
+ // Assign a portion of max-pending bytes to each IO thread
+ this.maxPendingBytesPerThread = conf.getMaxMessagePublishBufferSizeInMB() * 1024L * 1024L
+ / conf.getNumIOThreads();
+ this.resumeThresholdPendingBytesPerThread = this.maxPendingBytesPerThread / 2;
}
@Override
@@ -226,6 +252,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
this.ctx = ctx;
this.commandSender = new PulsarCommandSenderImpl(getBrokerService().getInterceptor(), this);
this.service.getPulsarStats().recordConnectionCreate();
+ cnxsPerThread.get().add(this);
}
@Override
@@ -237,6 +264,9 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
if (brokerInterceptor != null) {
brokerInterceptor.onConnectionClosed(this);
}
+
+ cnxsPerThread.get().remove(this);
+
// Connection is gone, close the producers immediately
producers.values().forEach((producerFuture) -> {
if (producerFuture.isDone() && !producerFuture.isCompletedExceptionally()) {
@@ -2149,7 +2179,6 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
.register();
public void startSendOperation(Producer producer, int msgSize, int numMessages) {
- MSG_PUBLISH_BUFFER_SIZE_UPDATER.getAndAdd(this, msgSize);
boolean isPublishRateExceeded = false;
if (preciseTopicPublishRateLimitingEnable) {
boolean isPreciseTopicPublishRateExceeded =
@@ -2170,22 +2199,43 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
autoReadDisabledRateLimiting = isPublishRateExceeded;
throttledConnections.inc();
}
- if (getBrokerService().isReachMessagePublishBufferThreshold()) {
- ctx.channel().config().setAutoRead(false);
- autoReadDisabledPublishBufferLimiting = true;
- throttledConnectionsGlobal.inc();
+
+ if (pendingBytesPerThread.get().addAndGet(msgSize) >= maxPendingBytesPerThread
+ && !autoReadDisabledPublishBufferLimiting
+ && maxPendingBytesPerThread > 0) {
+ // Disable reading from all the connections associated with this thread
+ MutableInt pausedConnections = new MutableInt();
+ cnxsPerThread.get().forEach(cnx -> {
+ if (cnx.hasProducers() && !cnx.autoReadDisabledPublishBufferLimiting) {
+ cnx.disableCnxAutoRead();
+ cnx.autoReadDisabledPublishBufferLimiting = true;
+ pausedConnections.increment();
+ }
+ });
+
+ getBrokerService().pausedConnections(pausedConnections.intValue());
}
}
@Override
public void completedSendOperation(boolean isNonPersistentTopic, int msgSize) {
- MSG_PUBLISH_BUFFER_SIZE_UPDATER.getAndAdd(this, -msgSize);
- if (--pendingSendRequest == resumeReadsThreshold && !ctx.channel().config().isAutoRead()) {
- // Resume reading from socket
- ctx.channel().config().setAutoRead(true);
- // triggers channel read if autoRead couldn't trigger it
- ctx.read();
- throttledConnections.dec();
+ if (pendingBytesPerThread.get().addAndGet(-msgSize) < resumeThresholdPendingBytesPerThread
+ && autoReadDisabledPublishBufferLimiting) {
+ // Re-enable reading on all the blocked connections
+ MutableInt resumedConnections = new MutableInt();
+ cnxsPerThread.get().forEach(cnx -> {
+ if (cnx.autoReadDisabledPublishBufferLimiting) {
+ cnx.autoReadDisabledPublishBufferLimiting = false;
+ cnx.enableCnxAutoRead();
+ resumedConnections.increment();
+ }
+ });
+
+ getBrokerService().resumedConnections(resumedConnections.intValue());
+ }
+
+ if (--pendingSendRequest == resumeReadsThreshold) {
+ enableCnxAutoRead();
}
if (isNonPersistentTopic) {
nonPersistentPendingMessages--;
@@ -2203,6 +2253,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
ctx.channel().config().setAutoRead(true);
// triggers channel read
ctx.read();
+ throttledConnections.dec();
}
}
@@ -2351,16 +2402,6 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
return clientVersion;
}
- @Override
- public long getMessagePublishBufferSize() {
- return this.messagePublishBufferSize;
- }
-
- @VisibleForTesting
- void setMessagePublishBufferSize(long bufferSize) {
- this.messagePublishBufferSize = bufferSize;
- }
-
@VisibleForTesting
void setAutoReadDisabledRateLimiting(boolean isLimiting) {
this.autoReadDisabledRateLimiting = isLimiting;
@@ -2432,10 +2473,14 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
String topicString = topic.map(t -> ", topic=" + t.toString()).orElse("");
if (ex instanceof AuthenticationException) {
log.info("[{}] Failed to authenticate: operation={}, principal={}{}, reason={}",
- remoteAddress, operation, principal, topicString, ex.getMessage());
+ remoteAddress, operation, principal, topicString, ex.getMessage());
} else {
log.error("[{}] Error trying to authenticate: operation={}, principal={}{}",
- remoteAddress, operation, principal, topicString, ex);
+ remoteAddress, operation, principal, topicString, ex);
}
}
+
+ public boolean hasProducers() {
+ return !producers.isEmpty();
+ }
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransportCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransportCnx.java
index af1b42e..cc0c559 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransportCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransportCnx.java
@@ -54,8 +54,6 @@ public interface TransportCnx {
void closeProducer(Producer producer);
- long getMessagePublishBufferSize();
-
void cancelPublishRateLimiting();
void cancelPublishBufferLimiting();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessagePublishBufferThrottleTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessagePublishBufferThrottleTest.java
index c8b94eb..739a7c1 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessagePublishBufferThrottleTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessagePublishBufferThrottleTest.java
@@ -18,10 +18,13 @@
*/
package org.apache.pulsar.broker.service;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.fail;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.common.util.FutureUtil;
@@ -48,75 +51,64 @@ public class MessagePublishBufferThrottleTest extends BrokerTestBase {
@Test
public void testMessagePublishBufferThrottleDisabled() throws Exception {
conf.setMaxMessagePublishBufferSizeInMB(-1);
- conf.setMessagePublishBufferCheckIntervalInMillis(10);
super.baseSetup();
final String topic = "persistent://prop/ns-abc/testMessagePublishBufferThrottleDisabled";
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.producerName("producer-name")
.create();
- Topic topicRef = pulsar.getBrokerService().getTopicReference(topic).get();
- Assert.assertNotNull(topicRef);
- TransportCnx cnx = ((AbstractTopic) topicRef).producers.get("producer-name").getCnx();
- ((ServerCnx) cnx).setMessagePublishBufferSize(Long.MAX_VALUE / 2);
- Thread.sleep(20);
- Assert.assertFalse(pulsar.getBrokerService().isReachMessagePublishBufferThreshold());
- List<CompletableFuture<MessageId>> futures = new ArrayList<>();
- // Make sure the producer can publish succeed.
+ assertEquals(pulsar.getBrokerService().getPausedConnections(), 0);
+
+ mockBookKeeper.addEntryDelay(1, TimeUnit.SECONDS);
+
+ // Make sure the producer can publish successfully
+ byte[] payload = new byte[1024 * 1024];
for (int i = 0; i < 10; i++) {
- futures.add(producer.sendAsync(new byte[1024 * 1024]));
+ producer.sendAsync(payload);
}
- FutureUtil.waitForAll(futures).get();
- for (CompletableFuture<MessageId> future : futures) {
- Assert.assertNotNull(future.get());
- }
- Thread.sleep(20);
- Assert.assertFalse(pulsar.getBrokerService().isReachMessagePublishBufferThreshold());
+ producer.flush();
+
+ assertEquals(pulsar.getBrokerService().getPausedConnections(), 0);
}
@Test
public void testMessagePublishBufferThrottleEnable() throws Exception {
conf.setMaxMessagePublishBufferSizeInMB(1);
- conf.setMessagePublishBufferCheckIntervalInMillis(Integer.MAX_VALUE);
super.baseSetup();
- Assert.assertFalse(pulsar.getBrokerService().isReachMessagePublishBufferThreshold());
+
+ assertEquals(pulsar.getBrokerService().getPausedConnections(), 0);
final String topic = "persistent://prop/ns-abc/testMessagePublishBufferThrottleEnable";
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.producerName("producer-name")
.create();
- Topic topicRef = pulsar.getBrokerService().getTopicReference(topic).get();
- Assert.assertNotNull(topicRef);
- TransportCnx cnx = ((AbstractTopic) topicRef).producers.get("producer-name").getCnx();
- ((ServerCnx) cnx).setMessagePublishBufferSize(Long.MAX_VALUE / 2);
- Assert.assertFalse(pulsar.getBrokerService().isReachMessagePublishBufferThreshold());
- // The first message can publish success, but the second message should be blocked
- producer.sendAsync(new byte[1024]).get(1, TimeUnit.SECONDS);
- getPulsar().getBrokerService().checkMessagePublishBuffer();
- Assert.assertTrue(pulsar.getBrokerService().isReachMessagePublishBufferThreshold());
-
- ((ServerCnx) cnx).setMessagePublishBufferSize(0L);
- getPulsar().getBrokerService().checkMessagePublishBuffer();
- Assert.assertFalse(pulsar.getBrokerService().isReachMessagePublishBufferThreshold());
- List<CompletableFuture<MessageId>> futures = new ArrayList<>();
- // Make sure the producer can publish succeed.
+
+ assertEquals(pulsar.getBrokerService().getPausedConnections(), 0);
+
+ mockBookKeeper.addEntryDelay(1, TimeUnit.SECONDS);
+
+ byte[] payload = new byte[1024 * 1024];
for (int i = 0; i < 10; i++) {
- futures.add(producer.sendAsync(new byte[1024 * 1024]));
- }
- FutureUtil.waitForAll(futures).get();
- for (CompletableFuture<MessageId> future : futures) {
- Assert.assertNotNull(future.get());
+ producer.sendAsync(payload);
}
+
Awaitility.await().untilAsserted(
- () -> Assert.assertEquals(pulsar.getBrokerService().getCurrentMessagePublishBufferSize(), 0L));
+ () -> Assert.assertEquals(pulsar.getBrokerService().getPausedConnections(), 1L));
+ assertEquals(pulsar.getBrokerService().getPausedConnections(), 1);
+
+ producer.flush();
+
+ Awaitility.await().untilAsserted(
+ () -> Assert.assertEquals(pulsar.getBrokerService().getPausedConnections(), 0L));
+
+ assertEquals(pulsar.getBrokerService().getPausedConnections(), 0);
}
@Test
public void testBlockByPublishRateLimiting() throws Exception {
conf.setMaxMessagePublishBufferSizeInMB(1);
- conf.setMessagePublishBufferCheckIntervalInMillis(Integer.MAX_VALUE);
super.baseSetup();
- Assert.assertFalse(pulsar.getBrokerService().isReachMessagePublishBufferThreshold());
+ assertEquals(pulsar.getBrokerService().getPausedConnections(), 0);
final String topic = "persistent://prop/ns-abc/testBlockByPublishRateLimiting";
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
@@ -124,36 +116,38 @@ public class MessagePublishBufferThrottleTest extends BrokerTestBase {
.create();
Topic topicRef = pulsar.getBrokerService().getTopicReference(topic).get();
Assert.assertNotNull(topicRef);
- TransportCnx cnx = ((AbstractTopic) topicRef).producers.get("producer-name").getCnx();
- ((ServerCnx) cnx).setMessagePublishBufferSize(Long.MAX_VALUE / 2);
- Assert.assertFalse(pulsar.getBrokerService().isReachMessagePublishBufferThreshold());
- producer.sendAsync(new byte[1024]).get(1, TimeUnit.SECONDS);
+ assertEquals(pulsar.getBrokerService().getPausedConnections(), 0);
+
+ mockBookKeeper.addEntryDelay(5, TimeUnit.SECONDS);
// Block by publish buffer.
- getPulsar().getBrokerService().checkMessagePublishBuffer();
- Assert.assertTrue(pulsar.getBrokerService().isReachMessagePublishBufferThreshold());
+ byte[] payload = new byte[1024 * 1024];
+ for (int i = 0; i < 10; i++) {
+ producer.sendAsync(payload);
+ }
+
+ assertEquals(pulsar.getBrokerService().getPausedConnections(), 1);
+
+ CompletableFuture<Void> flushFuture = producer.flushAsync();
// Block by publish rate.
- ((ServerCnx) cnx).setMessagePublishBufferSize(0L);
- getPulsar().getBrokerService().checkMessagePublishBuffer();
- ((ServerCnx) cnx).setAutoReadDisabledRateLimiting(true);
- cnx.disableCnxAutoRead();
- cnx.enableCnxAutoRead();
+ // After 1 second, the message buffer throttling will be lifted, but the rate limiting will still be in place.
+ assertEquals(pulsar.getBrokerService().getPausedConnections(), 1);
+ try {
+ flushFuture.get(2, TimeUnit.SECONDS);
+ fail("Should have timed out");
+ } catch (TimeoutException e) {
+ // Ok
+ }
+
+ flushFuture.join();
+
+ assertEquals(pulsar.getBrokerService().getPausedConnections(), 0);
// Resume message publish.
- ((ServerCnx) cnx).setAutoReadDisabledRateLimiting(false);
- cnx.enableCnxAutoRead();
- Assert.assertFalse(pulsar.getBrokerService().isReachMessagePublishBufferThreshold());
- List<CompletableFuture<MessageId>> futures = new ArrayList<>();
- // Make sure the producer can publish succeed.
- for (int i = 0; i < 10; i++) {
- futures.add(producer.sendAsync(new byte[1024 * 1024]));
- }
- FutureUtil.waitForAll(futures).get();
- for (CompletableFuture<MessageId> future : futures) {
- Assert.assertNotNull(future.get());
- }
- Awaitility.await().untilAsserted(
- () -> Assert.assertEquals(pulsar.getBrokerService().getCurrentMessagePublishBufferSize(), 0L));
+ ((AbstractTopic)topicRef).producers.get("producer-name").getCnx().enableCnxAutoRead();
+
+ flushFuture.get();
+ assertEquals(pulsar.getBrokerService().getPausedConnections(), 0);
}
}
diff --git a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
index ce71e5f..e646049 100644
--- a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
+++ b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
@@ -18,10 +18,24 @@
*/
package org.apache.bookkeeper.client;
-import java.util.*;
-import java.util.concurrent.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-
import com.google.common.collect.Lists;
import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
import org.apache.bookkeeper.client.AsyncCallback.DeleteCallback;
@@ -33,7 +47,6 @@ import org.apache.bookkeeper.client.impl.OpenBuilderBase;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.discover.RegistrationClient;
-import org.apache.bookkeeper.discover.ZKRegistrationClient;
import org.apache.bookkeeper.meta.LayoutManager;
import org.apache.bookkeeper.meta.LedgerManagerFactory;
import org.apache.bookkeeper.meta.MetadataClientDriver;
@@ -80,6 +93,7 @@ public class PulsarMockBookKeeper extends BookKeeper {
return ensemble;
}
+ Queue<Long> addEntryDelaysMillis = new ConcurrentLinkedQueue<>();
List<CompletableFuture<Void>> failures = new ArrayList<>();
public PulsarMockBookKeeper(ZooKeeper zkc, ExecutorService executor) throws Exception {
@@ -330,6 +344,10 @@ public class PulsarMockBookKeeper extends BookKeeper {
return promise;
}
+ public synchronized void addEntryDelay(long delay, TimeUnit unit) {
+ addEntryDelaysMillis.add(unit.toMillis(delay));
+ }
+
static int getExceptionCode(Throwable t) {
if (t instanceof BKException) {
return ((BKException) t).getCode();
diff --git a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
index 3031532..8a62e42 100644
--- a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
+++ b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
@@ -168,8 +168,13 @@ public class PulsarMockLedgerHandle extends LedgerHandle {
@Override
public void asyncAddEntry(final ByteBuf data, final AddCallback cb, final Object ctx) {
bk.getProgrammedFailure().thenComposeAsync((res) -> {
+ Long delayMillis = bk.addEntryDelaysMillis.poll();
+ if (delayMillis == null) {
+ delayMillis = 1L;
+ }
+
try {
- Thread.sleep(1);
+ Thread.sleep(delayMillis);
} catch (InterruptedException e) {
}