You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/05/02 17:45:49 UTC

[pulsar] branch master updated: Improved in max-pending-bytes mechanism for broker (#7406)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 2a522c8  Improved in max-pending-bytes mechanism for broker (#7406)
2a522c8 is described below

commit 2a522c85c0add3f1f647befb0a560689324114bd
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Sun May 2 10:44:54 2021 -0700

    Improved in max-pending-bytes mechanism for broker (#7406)
    
    * Improved in max-pending-bytes mechanism for broker
    
    * Fixed imports
    
    * Switched to LongAdder
---
 conf/broker.conf                                   |   4 -
 .../apache/pulsar/broker/ServiceConfiguration.java |   1 +
 .../pulsar/broker/service/AbstractTopic.java       |   8 +-
 .../pulsar/broker/service/BrokerService.java       |  63 +++-------
 .../apache/pulsar/broker/service/ServerCnx.java    | 127 ++++++++++++++-------
 .../apache/pulsar/broker/service/TransportCnx.java |   2 -
 .../service/MessagePublishBufferThrottleTest.java  | 126 ++++++++++----------
 .../bookkeeper/client/PulsarMockBookKeeper.java    |  26 ++++-
 .../bookkeeper/client/PulsarMockLedgerHandle.java  |   7 +-
 9 files changed, 190 insertions(+), 174 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 8238389..ce94927 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -474,10 +474,6 @@ replicatedSubscriptionsSnapshotMaxCachedPerSubscription=10
 # Use -1 to disable the memory limitation. Default is 1/2 of direct memory.
 maxMessagePublishBufferSizeInMB=
 
-# Interval between checks to see if message publish buffer size is exceed the max message publish buffer size
-# Use 0 or negative number to disable the max publish buffer limiting.
-messagePublishBufferCheckIntervalInMillis=100
-
 # Check between intervals to see if consumed ledgers need to be trimmed
 # Use 0 or negative number to disable the check
 retentionCheckIntervalInSeconds=120
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index a993402..0f08d1a 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -414,6 +414,7 @@ public class ServiceConfiguration implements PulsarConfiguration {
                 + "requests in memory. Default: 1000"
     )
     private int maxPendingPublishRequestsPerConnection = 1000;
+
     @FieldContext(
         category = CATEGORY_POLICIES,
         doc = "How frequently to proactively check and purge expired messages"
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index cf07578..23178b3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -270,16 +270,12 @@ public abstract class AbstractTopic implements Topic {
 
     @Override
     public void disableCnxAutoRead() {
-        if (producers != null) {
-            producers.values().forEach(producer -> producer.getCnx().disableCnxAutoRead());
-        }
+        producers.values().forEach(producer -> producer.getCnx().disableCnxAutoRead());
     }
 
     @Override
     public void enableCnxAutoRead() {
-        if (producers != null) {
-            producers.values().forEach(producer -> producer.getCnx().enableCnxAutoRead());
-        }
+        producers.values().forEach(producer -> producer.getCnx().enableCnxAutoRead());
     }
 
     protected boolean hasLocalProducers() {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index e43385b..1b855f8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -26,7 +26,6 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
 import static org.apache.pulsar.broker.cache.LocalZooKeeperCacheService.LOCAL_POLICIES_ROOT;
 import static org.apache.pulsar.broker.web.PulsarWebResource.joinPath;
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Queues;
@@ -67,7 +66,6 @@ import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.atomic.LongAdder;
 import java.util.concurrent.locks.ReadWriteLock;
@@ -232,7 +230,6 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
     private final ScheduledExecutorService inactivityMonitor;
     private final ScheduledExecutorService messageExpiryMonitor;
     private final ScheduledExecutorService compactionMonitor;
-    private final ScheduledExecutorService messagePublishBufferMonitor;
     private final ScheduledExecutorService consumedLedgersMonitor;
     private ScheduledExecutorService topicPublishRateLimiterMonitor;
     private ScheduledExecutorService brokerPublishRateLimiterMonitor;
@@ -269,18 +266,13 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
     private Channel listenChannelTls;
 
     private boolean preciseTopicPublishRateLimitingEnable;
-    private final long maxMessagePublishBufferBytes;
-    private final long resumeProducerReadMessagePublishBufferBytes;
-    private volatile boolean reachMessagePublishBufferThreshold;
+    private final LongAdder pausedConnections = new LongAdder();
     private BrokerInterceptor interceptor;
 
     private Set<BrokerEntryMetadataInterceptor> brokerEntryMetadataInterceptors;
 
     public BrokerService(PulsarService pulsar) throws Exception {
         this.pulsar = pulsar;
-        this.maxMessagePublishBufferBytes = pulsar.getConfiguration().getMaxMessagePublishBufferSizeInMB() > 0
-                ? pulsar.getConfiguration().getMaxMessagePublishBufferSizeInMB() * 1024L * 1024L : -1;
-        this.resumeProducerReadMessagePublishBufferBytes = this.maxMessagePublishBufferBytes / 2;
         this.preciseTopicPublishRateLimitingEnable =
                 pulsar.getConfiguration().isPreciseTopicPublishRateLimiterEnable();
         this.managedLedgerFactory = pulsar.getManagedLedgerFactory();
@@ -322,9 +314,6 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
         this.compactionMonitor =
                 Executors.newSingleThreadScheduledExecutor(
                         new DefaultThreadFactory("pulsar-compaction-monitor"));
-        this.messagePublishBufferMonitor =
-                Executors.newSingleThreadScheduledExecutor(
-                        new DefaultThreadFactory("pulsar-publish-buffer-monitor"));
         this.consumedLedgersMonitor = Executors
                 .newSingleThreadScheduledExecutor(new DefaultThreadFactory("consumed-Ledgers-monitor"));
 
@@ -474,7 +463,6 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
         this.startInactivityMonitor();
         this.startMessageExpiryMonitor();
         this.startCompactionMonitor();
-        this.startMessagePublishBufferMonitor();
         this.startConsumedLedgersMonitor();
         this.startBacklogQuotaChecker();
         this.updateBrokerPublisherThrottlingMaxRate();
@@ -552,14 +540,6 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
         }
     }
 
-    protected void startMessagePublishBufferMonitor() {
-        int interval = pulsar().getConfiguration().getMessagePublishBufferCheckIntervalInMillis();
-        if (interval > 0 && maxMessagePublishBufferBytes > 0) {
-            messagePublishBufferMonitor.scheduleAtFixedRate(safeRun(this::checkMessagePublishBuffer),
-                                                            interval, interval, TimeUnit.MILLISECONDS);
-        }
-    }
-
     protected void startConsumedLedgersMonitor() {
         int interval = pulsar().getConfiguration().getRetentionCheckIntervalInSeconds();
         if (interval > 0) {
@@ -757,7 +737,6 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
                                                 inactivityMonitor,
                                                 messageExpiryMonitor,
                                                 compactionMonitor,
-                                                messagePublishBufferMonitor,
                                                 consumedLedgersMonitor,
                                                 backlogQuotaChecker,
                                                 topicOrderedExecutor,
@@ -2487,22 +2466,6 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
         }
     }
 
-    @VisibleForTesting
-    void checkMessagePublishBuffer() {
-        AtomicLong currentMessagePublishBufferBytes = new AtomicLong();
-        foreachCnx(cnx -> currentMessagePublishBufferBytes.addAndGet(cnx.getMessagePublishBufferSize()));
-        if (currentMessagePublishBufferBytes.get() >= maxMessagePublishBufferBytes
-            && !reachMessagePublishBufferThreshold) {
-            reachMessagePublishBufferThreshold = true;
-            forEachTopic(topic -> ((AbstractTopic) topic).disableProducerRead());
-        }
-        if (currentMessagePublishBufferBytes.get() < resumeProducerReadMessagePublishBufferBytes
-            && reachMessagePublishBufferThreshold) {
-            reachMessagePublishBufferThreshold = false;
-            forEachTopic(topic -> ((AbstractTopic) topic).enableProducerReadForPublishBufferLimiting());
-        }
-    }
-
     private void foreachCnx(Consumer<TransportCnx> consumer) {
         Set<TransportCnx> cnxSet = new HashSet<>();
         topics.forEach((n, t) -> {
@@ -2512,17 +2475,6 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
         cnxSet.forEach(consumer);
     }
 
-    public boolean isReachMessagePublishBufferThreshold() {
-        return reachMessagePublishBufferThreshold;
-    }
-
-    @VisibleForTesting
-    long getCurrentMessagePublishBufferSize() {
-        AtomicLong currentMessagePublishBufferBytes = new AtomicLong();
-        foreachCnx(cnx -> currentMessagePublishBufferBytes.addAndGet(cnx.getMessagePublishBufferSize()));
-        return currentMessagePublishBufferBytes.get();
-    }
-
     public boolean isAllowAutoTopicCreation(final String topic) {
         TopicName topicName = TopicName.get(topic);
         return isAllowAutoTopicCreation(topicName);
@@ -2691,7 +2643,18 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
     }
 
     public boolean isBrokerEntryMetadataEnabled() {
-        return brokerEntryMetadataInterceptors.size() > 0;
+        return !brokerEntryMetadataInterceptors.isEmpty();
     }
 
+    public void pausedConnections(int numberOfConnections) {
+        pausedConnections.add(numberOfConnections);
+    }
+
+    public void resumedConnections(int numberOfConnections) {
+        pausedConnections.add(-numberOfConnections);
+    }
+
+    public long getPausedConnections() {
+        return pausedConnections.longValue();
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 09257f0..40a5948 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -32,10 +32,13 @@ import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelOption;
 import io.netty.handler.codec.haproxy.HAProxyMessage;
 import io.netty.handler.ssl.SslHandler;
+import io.netty.util.concurrent.FastThreadLocal;
 import io.netty.util.concurrent.Promise;
 import io.prometheus.client.Gauge;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
+import java.util.Collections;
+import java.util.IdentityHashMap;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Optional;
@@ -43,7 +46,6 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.stream.Collectors;
 import javax.naming.AuthenticationException;
 import javax.net.ssl.SSLSession;
@@ -56,7 +58,10 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.bookkeeper.mledger.util.SafeRun;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.commons.lang3.mutable.MutableLong;
 import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.TransactionMetadataStoreService;
 import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
@@ -182,16 +187,33 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
     // Flag to manage throttling-rate by atomically enable/disable read-channel.
     private volatile boolean autoReadDisabledRateLimiting = false;
     private FeatureFlags features;
-    // Flag to manage throttling-publish-buffer by atomically enable/disable read-channel.
-    private volatile boolean autoReadDisabledPublishBufferLimiting = false;
-    private static final AtomicLongFieldUpdater<ServerCnx> MSG_PUBLISH_BUFFER_SIZE_UPDATER =
-            AtomicLongFieldUpdater.newUpdater(ServerCnx.class, "messagePublishBufferSize");
-    private volatile long messagePublishBufferSize = 0;
+
     private PulsarCommandSender commandSender;
 
     private static final KeySharedMeta emptyKeySharedMeta = new KeySharedMeta()
             .setKeySharedMode(KeySharedMode.AUTO_SPLIT);
 
+    // Flag to manage throttling-publish-buffer by atomically enable/disable read-channel.
+    private boolean autoReadDisabledPublishBufferLimiting = false;
+    private final long maxPendingBytesPerThread;
+    private final long resumeThresholdPendingBytesPerThread;
+
+    // Number of bytes pending to be published from a single specific IO thread.
+    private static final FastThreadLocal<MutableLong> pendingBytesPerThread = new FastThreadLocal<MutableLong>() {
+        @Override
+        protected MutableLong initialValue() throws Exception {
+            return new MutableLong();
+        }
+    };
+
+    // A set of connections tied to the current thread
+    private static final FastThreadLocal<Set<ServerCnx>> cnxsPerThread = new FastThreadLocal<Set<ServerCnx>>() {
+        @Override
+        protected Set<ServerCnx> initialValue() throws Exception {
+            return Collections.newSetFromMap(new IdentityHashMap<>());
+        }
+    };
+
     enum State {
         Start, Connected, Failed, Connecting
     }
@@ -201,22 +223,26 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
         this.service = pulsar.getBrokerService();
         this.schemaService = pulsar.getSchemaRegistryService();
         this.state = State.Start;
+        ServiceConfiguration conf = pulsar.getConfiguration();
 
         // This maps are not heavily contended since most accesses are within the cnx thread
         this.producers = new ConcurrentLongHashMap<>(8, 1);
         this.consumers = new ConcurrentLongHashMap<>(8, 1);
-        this.replicatorPrefix = service.pulsar().getConfiguration().getReplicatorPrefix();
-        this.maxNonPersistentPendingMessages = service.pulsar().getConfiguration()
-                .getMaxConcurrentNonPersistentMessagePerConnection();
-        this.proxyRoles = service.pulsar().getConfiguration().getProxyRoles();
-        this.authenticateOriginalAuthData = service.pulsar().getConfiguration().isAuthenticateOriginalAuthData();
-        this.schemaValidationEnforced = pulsar.getConfiguration().isSchemaValidationEnforced();
-        this.maxMessageSize = pulsar.getConfiguration().getMaxMessageSize();
-        this.maxPendingSendRequests = pulsar.getConfiguration().getMaxPendingPublishRequestsPerConnection();
+        this.replicatorPrefix = conf.getReplicatorPrefix();
+        this.maxNonPersistentPendingMessages = conf.getMaxConcurrentNonPersistentMessagePerConnection();
+        this.proxyRoles = conf.getProxyRoles();
+        this.authenticateOriginalAuthData = conf.isAuthenticateOriginalAuthData();
+        this.schemaValidationEnforced = conf.isSchemaValidationEnforced();
+        this.maxMessageSize = conf.getMaxMessageSize();
+        this.maxPendingSendRequests = conf.getMaxPendingPublishRequestsPerConnection();
         this.resumeReadsThreshold = maxPendingSendRequests / 2;
-        this.preciseDispatcherFlowControl = pulsar.getConfiguration().isPreciseDispatcherFlowControl();
-        this.preciseTopicPublishRateLimitingEnable = pulsar.getConfiguration().isPreciseTopicPublishRateLimiterEnable();
-        this.encryptionRequireOnProducer = pulsar.getConfiguration().isEncryptionRequireOnProducer();
+        this.preciseDispatcherFlowControl = conf.isPreciseDispatcherFlowControl();
+        this.preciseTopicPublishRateLimitingEnable = conf.isPreciseTopicPublishRateLimiterEnable();
+        this.encryptionRequireOnProducer = conf.isEncryptionRequireOnProducer();
+        // Assign a portion of max-pending bytes to each IO thread
+        this.maxPendingBytesPerThread = conf.getMaxMessagePublishBufferSizeInMB() * 1024L * 1024L
+                / conf.getNumIOThreads();
+        this.resumeThresholdPendingBytesPerThread = this.maxPendingBytesPerThread / 2;
     }
 
     @Override
@@ -226,6 +252,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
         this.ctx = ctx;
         this.commandSender = new PulsarCommandSenderImpl(getBrokerService().getInterceptor(), this);
         this.service.getPulsarStats().recordConnectionCreate();
+        cnxsPerThread.get().add(this);
     }
 
     @Override
@@ -237,6 +264,9 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
         if (brokerInterceptor != null) {
             brokerInterceptor.onConnectionClosed(this);
         }
+
+        cnxsPerThread.get().remove(this);
+
         // Connection is gone, close the producers immediately
         producers.values().forEach((producerFuture) -> {
             if (producerFuture.isDone() && !producerFuture.isCompletedExceptionally()) {
@@ -2149,7 +2179,6 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
             .register();
 
     public void startSendOperation(Producer producer, int msgSize, int numMessages) {
-        MSG_PUBLISH_BUFFER_SIZE_UPDATER.getAndAdd(this, msgSize);
         boolean isPublishRateExceeded = false;
         if (preciseTopicPublishRateLimitingEnable) {
             boolean isPreciseTopicPublishRateExceeded =
@@ -2170,22 +2199,43 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
             autoReadDisabledRateLimiting = isPublishRateExceeded;
             throttledConnections.inc();
         }
-        if (getBrokerService().isReachMessagePublishBufferThreshold()) {
-            ctx.channel().config().setAutoRead(false);
-            autoReadDisabledPublishBufferLimiting = true;
-            throttledConnectionsGlobal.inc();
+
+        if (pendingBytesPerThread.get().addAndGet(msgSize) >= maxPendingBytesPerThread
+                && !autoReadDisabledPublishBufferLimiting
+                && maxPendingBytesPerThread > 0) {
+            // Disable reading from all the connections associated with this thread
+            MutableInt pausedConnections = new MutableInt();
+            cnxsPerThread.get().forEach(cnx -> {
+                if (cnx.hasProducers() && !cnx.autoReadDisabledPublishBufferLimiting) {
+                    cnx.disableCnxAutoRead();
+                    cnx.autoReadDisabledPublishBufferLimiting = true;
+                    pausedConnections.increment();
+                }
+            });
+
+            getBrokerService().pausedConnections(pausedConnections.intValue());
         }
     }
 
     @Override
     public void completedSendOperation(boolean isNonPersistentTopic, int msgSize) {
-        MSG_PUBLISH_BUFFER_SIZE_UPDATER.getAndAdd(this, -msgSize);
-        if (--pendingSendRequest == resumeReadsThreshold && !ctx.channel().config().isAutoRead()) {
-            // Resume reading from socket
-            ctx.channel().config().setAutoRead(true);
-            // triggers channel read if autoRead couldn't trigger it
-            ctx.read();
-            throttledConnections.dec();
+        if (pendingBytesPerThread.get().addAndGet(-msgSize) < resumeThresholdPendingBytesPerThread
+                && autoReadDisabledPublishBufferLimiting) {
+            // Re-enable reading on all the blocked connections
+            MutableInt resumedConnections = new MutableInt();
+            cnxsPerThread.get().forEach(cnx -> {
+                if (cnx.autoReadDisabledPublishBufferLimiting) {
+                    cnx.autoReadDisabledPublishBufferLimiting = false;
+                    cnx.enableCnxAutoRead();
+                    resumedConnections.increment();
+                }
+            });
+
+            getBrokerService().resumedConnections(resumedConnections.intValue());
+        }
+
+        if (--pendingSendRequest == resumeReadsThreshold) {
+            enableCnxAutoRead();
         }
         if (isNonPersistentTopic) {
             nonPersistentPendingMessages--;
@@ -2203,6 +2253,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
             ctx.channel().config().setAutoRead(true);
             // triggers channel read
             ctx.read();
+            throttledConnections.dec();
         }
     }
 
@@ -2351,16 +2402,6 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
         return clientVersion;
     }
 
-    @Override
-    public long getMessagePublishBufferSize() {
-        return this.messagePublishBufferSize;
-    }
-
-    @VisibleForTesting
-    void setMessagePublishBufferSize(long bufferSize) {
-        this.messagePublishBufferSize = bufferSize;
-    }
-
     @VisibleForTesting
     void setAutoReadDisabledRateLimiting(boolean isLimiting) {
         this.autoReadDisabledRateLimiting = isLimiting;
@@ -2432,10 +2473,14 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
         String topicString = topic.map(t -> ", topic=" + t.toString()).orElse("");
         if (ex instanceof AuthenticationException) {
             log.info("[{}] Failed to authenticate: operation={}, principal={}{}, reason={}",
-                     remoteAddress, operation, principal, topicString, ex.getMessage());
+                    remoteAddress, operation, principal, topicString, ex.getMessage());
         } else {
             log.error("[{}] Error trying to authenticate: operation={}, principal={}{}",
-                      remoteAddress, operation, principal, topicString, ex);
+                    remoteAddress, operation, principal, topicString, ex);
         }
     }
+
+    public boolean hasProducers() {
+        return !producers.isEmpty();
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransportCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransportCnx.java
index af1b42e..cc0c559 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransportCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransportCnx.java
@@ -54,8 +54,6 @@ public interface TransportCnx {
 
     void closeProducer(Producer producer);
 
-    long getMessagePublishBufferSize();
-
     void cancelPublishRateLimiting();
 
     void cancelPublishBufferLimiting();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessagePublishBufferThrottleTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessagePublishBufferThrottleTest.java
index c8b94eb..739a7c1 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessagePublishBufferThrottleTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessagePublishBufferThrottleTest.java
@@ -18,10 +18,13 @@
  */
 package org.apache.pulsar.broker.service;
 
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.fail;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -48,75 +51,64 @@ public class MessagePublishBufferThrottleTest extends BrokerTestBase {
     @Test
     public void testMessagePublishBufferThrottleDisabled() throws Exception {
         conf.setMaxMessagePublishBufferSizeInMB(-1);
-        conf.setMessagePublishBufferCheckIntervalInMillis(10);
         super.baseSetup();
         final String topic = "persistent://prop/ns-abc/testMessagePublishBufferThrottleDisabled";
         Producer<byte[]> producer = pulsarClient.newProducer()
                 .topic(topic)
                 .producerName("producer-name")
                 .create();
-        Topic topicRef = pulsar.getBrokerService().getTopicReference(topic).get();
-        Assert.assertNotNull(topicRef);
-        TransportCnx cnx = ((AbstractTopic) topicRef).producers.get("producer-name").getCnx();
-        ((ServerCnx) cnx).setMessagePublishBufferSize(Long.MAX_VALUE / 2);
-        Thread.sleep(20);
-        Assert.assertFalse(pulsar.getBrokerService().isReachMessagePublishBufferThreshold());
-        List<CompletableFuture<MessageId>> futures = new ArrayList<>();
-        // Make sure the producer can publish succeed.
+         assertEquals(pulsar.getBrokerService().getPausedConnections(), 0);
+
+         mockBookKeeper.addEntryDelay(1, TimeUnit.SECONDS);
+
+        // Make sure the producer can publish successfully
+        byte[] payload = new byte[1024 * 1024];
         for (int i = 0; i < 10; i++) {
-            futures.add(producer.sendAsync(new byte[1024 * 1024]));
+            producer.sendAsync(payload);
         }
-        FutureUtil.waitForAll(futures).get();
-        for (CompletableFuture<MessageId> future : futures) {
-            Assert.assertNotNull(future.get());
-        }
-        Thread.sleep(20);
-        Assert.assertFalse(pulsar.getBrokerService().isReachMessagePublishBufferThreshold());
+        producer.flush();
+
+        assertEquals(pulsar.getBrokerService().getPausedConnections(), 0);
     }
 
     @Test
     public void testMessagePublishBufferThrottleEnable() throws Exception {
         conf.setMaxMessagePublishBufferSizeInMB(1);
-        conf.setMessagePublishBufferCheckIntervalInMillis(Integer.MAX_VALUE);
         super.baseSetup();
-        Assert.assertFalse(pulsar.getBrokerService().isReachMessagePublishBufferThreshold());
+
+        assertEquals(pulsar.getBrokerService().getPausedConnections(), 0);
         final String topic = "persistent://prop/ns-abc/testMessagePublishBufferThrottleEnable";
         Producer<byte[]> producer = pulsarClient.newProducer()
                 .topic(topic)
                 .producerName("producer-name")
                 .create();
-        Topic topicRef = pulsar.getBrokerService().getTopicReference(topic).get();
-        Assert.assertNotNull(topicRef);
-        TransportCnx cnx = ((AbstractTopic) topicRef).producers.get("producer-name").getCnx();
-        ((ServerCnx) cnx).setMessagePublishBufferSize(Long.MAX_VALUE / 2);
-        Assert.assertFalse(pulsar.getBrokerService().isReachMessagePublishBufferThreshold());
-        // The first message can publish success, but the second message should be blocked
-        producer.sendAsync(new byte[1024]).get(1, TimeUnit.SECONDS);
-        getPulsar().getBrokerService().checkMessagePublishBuffer();
-        Assert.assertTrue(pulsar.getBrokerService().isReachMessagePublishBufferThreshold());
-
-        ((ServerCnx) cnx).setMessagePublishBufferSize(0L);
-        getPulsar().getBrokerService().checkMessagePublishBuffer();
-        Assert.assertFalse(pulsar.getBrokerService().isReachMessagePublishBufferThreshold());
-        List<CompletableFuture<MessageId>> futures = new ArrayList<>();
-        // Make sure the producer can publish succeed.
+
+        assertEquals(pulsar.getBrokerService().getPausedConnections(), 0);
+
+        mockBookKeeper.addEntryDelay(1, TimeUnit.SECONDS);
+
+        byte[] payload = new byte[1024 * 1024];
         for (int i = 0; i < 10; i++) {
-            futures.add(producer.sendAsync(new byte[1024 * 1024]));
-        }
-        FutureUtil.waitForAll(futures).get();
-        for (CompletableFuture<MessageId> future : futures) {
-            Assert.assertNotNull(future.get());
+            producer.sendAsync(payload);
         }
+
         Awaitility.await().untilAsserted(
-                () -> Assert.assertEquals(pulsar.getBrokerService().getCurrentMessagePublishBufferSize(), 0L));
+                () -> Assert.assertEquals(pulsar.getBrokerService().getPausedConnections(), 1L));
+        assertEquals(pulsar.getBrokerService().getPausedConnections(), 1);
+
+        producer.flush();
+
+        Awaitility.await().untilAsserted(
+            () -> Assert.assertEquals(pulsar.getBrokerService().getPausedConnections(), 0L));
+
+        assertEquals(pulsar.getBrokerService().getPausedConnections(), 0);
     }
 
     @Test
     public void testBlockByPublishRateLimiting() throws Exception {
         conf.setMaxMessagePublishBufferSizeInMB(1);
-        conf.setMessagePublishBufferCheckIntervalInMillis(Integer.MAX_VALUE);
         super.baseSetup();
-        Assert.assertFalse(pulsar.getBrokerService().isReachMessagePublishBufferThreshold());
+        assertEquals(pulsar.getBrokerService().getPausedConnections(), 0);
         final String topic = "persistent://prop/ns-abc/testBlockByPublishRateLimiting";
         Producer<byte[]> producer = pulsarClient.newProducer()
                 .topic(topic)
@@ -124,36 +116,38 @@ public class MessagePublishBufferThrottleTest extends BrokerTestBase {
                 .create();
         Topic topicRef = pulsar.getBrokerService().getTopicReference(topic).get();
         Assert.assertNotNull(topicRef);
-        TransportCnx cnx = ((AbstractTopic) topicRef).producers.get("producer-name").getCnx();
-        ((ServerCnx) cnx).setMessagePublishBufferSize(Long.MAX_VALUE / 2);
-        Assert.assertFalse(pulsar.getBrokerService().isReachMessagePublishBufferThreshold());
-        producer.sendAsync(new byte[1024]).get(1, TimeUnit.SECONDS);
+        assertEquals(pulsar.getBrokerService().getPausedConnections(), 0);
+
+        mockBookKeeper.addEntryDelay(5, TimeUnit.SECONDS);
 
         // Block by publish buffer.
-        getPulsar().getBrokerService().checkMessagePublishBuffer();
-        Assert.assertTrue(pulsar.getBrokerService().isReachMessagePublishBufferThreshold());
+        byte[] payload = new byte[1024 * 1024];
+        for (int i = 0; i < 10; i++) {
+            producer.sendAsync(payload);
+        }
+
+        assertEquals(pulsar.getBrokerService().getPausedConnections(), 1);
+
+        CompletableFuture<Void> flushFuture = producer.flushAsync();
 
         // Block by publish rate.
-        ((ServerCnx) cnx).setMessagePublishBufferSize(0L);
-        getPulsar().getBrokerService().checkMessagePublishBuffer();
-        ((ServerCnx) cnx).setAutoReadDisabledRateLimiting(true);
-        cnx.disableCnxAutoRead();
-        cnx.enableCnxAutoRead();
+        // After 1 second, the message buffer throttling will be lifted, but the rate limiting will still be in place.
+        assertEquals(pulsar.getBrokerService().getPausedConnections(), 1);
+        try {
+            flushFuture.get(2, TimeUnit.SECONDS);
+            fail("Should have timed out");
+        } catch (TimeoutException e) {
+            // Ok
+        }
+
+        flushFuture.join();
+
+        assertEquals(pulsar.getBrokerService().getPausedConnections(), 0);
 
         // Resume message publish.
-        ((ServerCnx) cnx).setAutoReadDisabledRateLimiting(false);
-        cnx.enableCnxAutoRead();
-        Assert.assertFalse(pulsar.getBrokerService().isReachMessagePublishBufferThreshold());
-        List<CompletableFuture<MessageId>> futures = new ArrayList<>();
-        // Make sure the producer can publish succeed.
-        for (int i = 0; i < 10; i++) {
-            futures.add(producer.sendAsync(new byte[1024 * 1024]));
-        }
-        FutureUtil.waitForAll(futures).get();
-        for (CompletableFuture<MessageId> future : futures) {
-            Assert.assertNotNull(future.get());
-        }
-        Awaitility.await().untilAsserted(
-                () -> Assert.assertEquals(pulsar.getBrokerService().getCurrentMessagePublishBufferSize(), 0L));
+        ((AbstractTopic)topicRef).producers.get("producer-name").getCnx().enableCnxAutoRead();
+
+        flushFuture.get();
+        assertEquals(pulsar.getBrokerService().getPausedConnections(), 0);
     }
 }
diff --git a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
index ce71e5f..e646049 100644
--- a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
+++ b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
@@ -18,10 +18,24 @@
  */
 package org.apache.bookkeeper.client;
 
-import java.util.*;
-import java.util.concurrent.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
-
 import com.google.common.collect.Lists;
 import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
 import org.apache.bookkeeper.client.AsyncCallback.DeleteCallback;
@@ -33,7 +47,6 @@ import org.apache.bookkeeper.client.impl.OpenBuilderBase;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.discover.RegistrationClient;
-import org.apache.bookkeeper.discover.ZKRegistrationClient;
 import org.apache.bookkeeper.meta.LayoutManager;
 import org.apache.bookkeeper.meta.LedgerManagerFactory;
 import org.apache.bookkeeper.meta.MetadataClientDriver;
@@ -80,6 +93,7 @@ public class PulsarMockBookKeeper extends BookKeeper {
         return ensemble;
     }
 
+    Queue<Long> addEntryDelaysMillis = new ConcurrentLinkedQueue<>();
     List<CompletableFuture<Void>> failures = new ArrayList<>();
 
     public PulsarMockBookKeeper(ZooKeeper zkc, ExecutorService executor) throws Exception {
@@ -330,6 +344,10 @@ public class PulsarMockBookKeeper extends BookKeeper {
         return promise;
     }
 
+    public synchronized void addEntryDelay(long delay, TimeUnit unit) {
+        addEntryDelaysMillis.add(unit.toMillis(delay));
+    }
+
     static int getExceptionCode(Throwable t) {
         if (t instanceof BKException) {
             return ((BKException) t).getCode();
diff --git a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
index 3031532..8a62e42 100644
--- a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
+++ b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
@@ -168,8 +168,13 @@ public class PulsarMockLedgerHandle extends LedgerHandle {
     @Override
     public void asyncAddEntry(final ByteBuf data, final AddCallback cb, final Object ctx) {
         bk.getProgrammedFailure().thenComposeAsync((res) -> {
+                Long delayMillis = bk.addEntryDelaysMillis.poll();
+                if (delayMillis == null) {
+                    delayMillis = 1L;
+                }
+
                 try {
-                    Thread.sleep(1);
+                    Thread.sleep(delayMillis);
                 } catch (InterruptedException e) {
                 }