You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/12/10 21:50:02 UTC

[pulsar] 01/02: [Issue #11351] Parallel Precise Publish Rate Limiting Fix (#11372)

This is an automated email from the ASF dual-hosted git repository.

mmarshall pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 46af23e3fb54675764f14153bb52303649b5b8c2
Author: Ron Farkash <ro...@gmail.com>
AuthorDate: Thu Jul 29 18:22:27 2021 +0300

    [Issue #11351] Parallel Precise Publish Rate Limiting Fix (#11372)
    
    ## Master Issue: <https://github.com/apache/pulsar/issues/11351>
    
    ### Motivation
    Hello, as far as I'm concerned it is well known that precise publish rate limiting does not function well. I believe my PR fixes problem number 3 stated in the issue above.
    
    @danielsinai:
    "3. Rate limit function passed only to the msg/s rate limiter (and that's in order to avoid calling it twice)"
    
    It was passed to message rate limiter only due to the fact that there was no implementation of a way to throttle the connection whenever only **one of the limiters was exceeded**.
    
    This PR will allow both message rate & byte rate to co-exist, limit and enable socket reading only when necessary.
    
    ### Modifications
    
    - _tryAcquire_ function in **PublishRateLimiterDisable** will return true. If publish rate was null, this function would get called and return false, thus throttling the client for no reason. If the publish rate is null, it means it was not set by anyone so there's no reason to throttle any connection.
    ```java
    public boolean tryAcquire(int numbers, long bytes) {
            return true;
    }
    ```
    - **RateLimiter** _permits_ and _acquiredPermits_ were changed to volatile.
    ```java
     private volatile long permits;
     private volatile long acquiredPermits;
    ```
    in order to allow reading access from multiple threads at the same time.
    also the removal of _synchronized_ keyword from _getAvailablePermits()_ function.
    ```java
    public long getAvailablePermits() {
            return Math.max(0, this.permits - this.acquiredPermits);
    }
    ```
    **This is required, since a thread dead lock will happen if not.**
    
    - Created ~a HashMap to manage the byte and message rate limiters, and~ a function _releaseThrottle()_ to handle the auto read enable.
    If one of the rate limiters has no available permits we will not re-enable the auto read from the socket.
    
    (cherry picked from commit 7f2ca8f56d0928fc54eea4442f8244a704747255)
---
 .../pulsar/broker/service/PrecisPublishLimiter.java      | 16 +++++++++++++---
 .../java/org/apache/pulsar/common/util/RateLimiter.java  |  6 +++---
 2 files changed, 16 insertions(+), 6 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java
index e306757..016cf03 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java
@@ -63,7 +63,14 @@ public class PrecisPublishLimiter implements PublishRateLimiter {
     public boolean isPublishRateExceeded() {
         return false;
     }
-
+    // If all rate limiters are not exceeded, re-enable auto read from socket.
+    private void tryReleaseConnectionThrottle() {
+        if ((topicPublishRateLimiterOnMessage != null && topicPublishRateLimiterOnMessage.getAvailablePermits() <= 0)
+        || (topicPublishRateLimiterOnByte != null && topicPublishRateLimiterOnByte.getAvailablePermits() <= 0)) {
+            return;
+        }
+        this.rateLimitFunction.apply();
+    }
 
     @Override
     public void update(Policies policies, String clusterName) {
@@ -80,10 +87,13 @@ public class PrecisPublishLimiter implements PublishRateLimiter {
             this.publishMaxByteRate = Math.max(maxPublishRate.publishThrottlingRateInByte, 0);
             if (this.publishMaxMessageRate > 0) {
                 topicPublishRateLimiterOnMessage =
-                        new RateLimiter(publishMaxMessageRate, 1, TimeUnit.SECONDS, rateLimitFunction, true);
+                        new RateLimiter(publishMaxMessageRate, 1, TimeUnit.SECONDS,
+                                this::tryReleaseConnectionThrottle, true);
             }
             if (this.publishMaxByteRate > 0) {
-                topicPublishRateLimiterOnByte = new RateLimiter(publishMaxByteRate, 1, TimeUnit.SECONDS, true);
+                topicPublishRateLimiterOnByte =
+                        new RateLimiter(publishMaxByteRate, 1, TimeUnit.SECONDS,
+                                this::tryReleaseConnectionThrottle, true);
             }
         } else {
             this.publishMaxMessageRate = 0;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java
index 7e37f4a..a835178 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java
@@ -56,8 +56,8 @@ public class RateLimiter implements AutoCloseable{
     private TimeUnit timeUnit;
     private final boolean externalExecutor;
     private ScheduledFuture<?> renewTask;
-    private long permits;
-    private long acquiredPermits;
+    private volatile long permits;
+    private volatile long acquiredPermits;
     private boolean isClosed;
     // permitUpdate helps to update permit-rate at runtime
     private Supplier<Long> permitUpdater;
@@ -219,7 +219,7 @@ public class RateLimiter implements AutoCloseable{
      *
      * @return returns 0 if permits is not available
      */
-    public synchronized long getAvailablePermits() {
+    public long getAvailablePermits() {
         return Math.max(0, this.permits - this.acquiredPermits);
     }