You are viewing a plain text version of this content. The canonical link for it is here.
Posted to pr@cassandra.apache.org by GitBox <gi...@apache.org> on 2021/08/06 20:03:31 UTC

[GitHub] [cassandra] josh-mckenzie commented on a change in pull request #1045: CASSANDRA-16663 Request-Based Native Transport Rate-Limiting

josh-mckenzie commented on a change in pull request #1045:
URL: https://github.com/apache/cassandra/pull/1045#discussion_r684458398



##########
File path: src/java/org/apache/cassandra/utils/NoWaitRateLimiter.java
##########
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils;
+
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.NotThreadSafe;
+import javax.annotation.concurrent.ThreadSafe;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.google.common.base.Ticker;
+import com.google.common.math.LongMath;
+
+@ThreadSafe
+@SuppressWarnings("UnstableApiUsage")
+public class NoWaitRateLimiter
+{
+    @GuardedBy("this")
+    private Stopwatch stopwatch;
+
+    @GuardedBy("this")
+    private SmoothRateLimiter delegate;
+
+    private NoWaitRateLimiter(SmoothRateLimiter delegate, Ticker ticker)
+    {
+        this.delegate = delegate;
+        this.stopwatch = Stopwatch.createStarted(ticker);
+    }
+    
+    public static NoWaitRateLimiter create(double permitsPerSecond)
+    {
+        return create(permitsPerSecond, Ticker.systemTicker());
+    }
+
+    public static NoWaitRateLimiter create(double permitsPerSecond, Ticker ticker)
+    {
+        return create(permitsPerSecond, 1.0D, ticker);
+    }
+
+    public static NoWaitRateLimiter create(double permitsPerSecond, double burstSeconds)
+    {
+        return create(permitsPerSecond, burstSeconds, Ticker.systemTicker());
+    }
+
+    public static NoWaitRateLimiter create(double permitsPerSecond, double burstSeconds, Ticker ticker)
+    {
+        SmoothRateLimiter delegate = new SmoothRateLimiter(burstSeconds);
+        NoWaitRateLimiter limiter = new NoWaitRateLimiter(delegate, ticker);
+        limiter.setRate(permitsPerSecond);
+        return limiter;
+    }
+    
+    @VisibleForTesting
+    public synchronized void reset(double permitsPerSecond, double burstSeconds, Ticker ticker)

Review comment:
       If we want this usage to be for test only, consider renaming to `resetForTest`

##########
File path: src/java/org/apache/cassandra/utils/NoWaitRateLimiter.java
##########
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils;
+
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.NotThreadSafe;
+import javax.annotation.concurrent.ThreadSafe;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.google.common.base.Ticker;
+import com.google.common.math.LongMath;
+
+@ThreadSafe
+@SuppressWarnings("UnstableApiUsage")
+public class NoWaitRateLimiter

Review comment:
       Did we consider a specialization of [SmoothRateLimiter](https://github.com/google/guava/blob/v30.1.1/guava/src/com/google/common/util/concurrent/SmoothRateLimiter.java ) from guava and decide to go our own path?
   
   Looks like [SmoothBursty](https://github.com/google/guava/blob/master/guava/src/com/google/common/util/concurrent/SmoothRateLimiter.java#L353) might be a match for what we're doing here w/the appropriate aggressive permit grant immediately rather than the delay behavior.
   
   From a meta stylistic perspective, I prefer their style with the more heavy javadoccing on the class and method level to provide a cognitive framework for understanding the structure of the timer - we may be able to pull some of that from there and slightly tune it to better reflect our behavior as fits here.
    

##########
File path: src/java/org/apache/cassandra/transport/Dispatcher.java
##########
@@ -65,20 +73,38 @@ public Dispatcher(boolean useLegacyFlusher)
         this.useLegacyFlusher = useLegacyFlusher;
     }
 
-    public void dispatch(Channel channel, Message.Request request, FlushItemConverter forFlusher)
+    public void dispatch(Channel channel, Message.Request request, FlushItemConverter forFlusher, Overload backpressure)
     {
-        requestExecutor.submit(() -> processRequest(channel, request, forFlusher));
+        requestExecutor.submit(() -> processRequest(channel, request, forFlusher, backpressure));
     }
 
     /**
      * Note: this method may be executed on the netty event loop, during initial protocol negotiation
      */
-    static Message.Response processRequest(ServerConnection connection, Message.Request request)
+    static Message.Response processRequest(ServerConnection connection, Message.Request request, Overload backpressure)
     {
         long queryStartNanoTime = System.nanoTime();
         if (connection.getVersion().isGreaterOrEqualTo(ProtocolVersion.V4))
             ClientWarn.instance.captureWarnings();
 
+        if (backpressure == Overload.REQUESTS)
+        {
+            String message = String.format("Request breached global limit of %.2f requests/second and triggered backpressure.",
+                                           ClientResourceLimits.getNativeTransportRequestsPerSecond());
+            
+            NoSpamLogger.log(logger, NoSpamLogger.Level.INFO, 1, TimeUnit.MINUTES, message);
+            ClientWarn.instance.warn(message);
+        }
+        else if (backpressure == Overload.BYTES_IN_FLIGHT)
+        {
+            NoSpamLogger.log(logger, NoSpamLogger.Level.INFO, 1, TimeUnit.MINUTES,

Review comment:
       nit: is there a reason we didn't do the same here as above - construct the string and re-use it for both contexts?

##########
File path: src/java/org/apache/cassandra/utils/NoWaitRateLimiter.java
##########
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils;
+
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.NotThreadSafe;
+import javax.annotation.concurrent.ThreadSafe;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.google.common.base.Ticker;
+import com.google.common.math.LongMath;
+
+@ThreadSafe
+@SuppressWarnings("UnstableApiUsage")
+public class NoWaitRateLimiter
+{
+    @GuardedBy("this")
+    private Stopwatch stopwatch;
+
+    @GuardedBy("this")
+    private SmoothRateLimiter delegate;
+
+    private NoWaitRateLimiter(SmoothRateLimiter delegate, Ticker ticker)
+    {
+        this.delegate = delegate;
+        this.stopwatch = Stopwatch.createStarted(ticker);
+    }
+    
+    public static NoWaitRateLimiter create(double permitsPerSecond)
+    {
+        return create(permitsPerSecond, Ticker.systemTicker());
+    }
+
+    public static NoWaitRateLimiter create(double permitsPerSecond, Ticker ticker)
+    {
+        return create(permitsPerSecond, 1.0D, ticker);
+    }
+
+    public static NoWaitRateLimiter create(double permitsPerSecond, double burstSeconds)
+    {
+        return create(permitsPerSecond, burstSeconds, Ticker.systemTicker());
+    }
+
+    public static NoWaitRateLimiter create(double permitsPerSecond, double burstSeconds, Ticker ticker)
+    {
+        SmoothRateLimiter delegate = new SmoothRateLimiter(burstSeconds);
+        NoWaitRateLimiter limiter = new NoWaitRateLimiter(delegate, ticker);
+        limiter.setRate(permitsPerSecond);
+        return limiter;
+    }
+    
+    @VisibleForTesting
+    public synchronized void reset(double permitsPerSecond, double burstSeconds, Ticker ticker)
+    {
+        this.stopwatch = Stopwatch.createStarted(ticker);
+        this.delegate = new SmoothRateLimiter(burstSeconds);
+        setRate(permitsPerSecond);
+    }
+
+    public final void setRate(double permitsPerSecond) 
+    {
+        Preconditions.checkArgument(permitsPerSecond > 0.0D && !Double.isNaN(permitsPerSecond), "rate must be positive");
+        
+        synchronized (this) 
+        {
+            delegate.doSetRate(permitsPerSecond, stopwatch.elapsed(TimeUnit.MICROSECONDS));
+        }
+    }
+
+    public synchronized double getRate() 
+    {
+        return delegate.getRate();
+    }
+
+    /**
+     * Forces the acquisition of a single permit.
+     * 
+     * @return microseconds until the acquired permit is available, and zero if it already is
+     */
+    public synchronized long reserveAndGetWaitLength() 
+    {
+        long nowMicros = this.stopwatch.elapsed(TimeUnit.MICROSECONDS);
+        long momentAvailable = delegate.reserveEarliestAvailable(1, nowMicros);
+        return Math.max(momentAvailable - nowMicros, 0L);
+    }
+
+    public synchronized boolean tryAcquire()
+    {
+        long nowMicros = this.stopwatch.elapsed(TimeUnit.MICROSECONDS);
+
+        if (!delegate.canAcquire(nowMicros)) 
+        {
+            return false;
+        }
+
+        delegate.reserveEarliestAvailable(1, nowMicros);
+            
+        return true;
+    }
+
+    public synchronized boolean canAcquire() 
+    {
+        long nowMicros = this.stopwatch.elapsed(TimeUnit.MICROSECONDS);
+        return delegate.canAcquire(nowMicros);
+    }
+
+    public synchronized long waitTimeMicros()
+    {
+        long nowMicros = this.stopwatch.elapsed(TimeUnit.MICROSECONDS);
+        return delegate.waitTimeMicros(nowMicros);
+    }
+
+    @VisibleForTesting
+    public synchronized long getNextFreeTicketMicros()
+    {
+        return delegate.nextFreeTicketMicros;
+    }
+
+    public synchronized double getStoredPermits()
+    {
+        return delegate.getStoredPermits();
+    }
+    
+    @Override
+    public String toString()
+    {
+        return String.format("Maximum requests/second is %.2f. %.2f stored permits available.", 
+                             getRate(), getStoredPermits());
+    }
+
+    @NotThreadSafe
+    private static class SmoothRateLimiter
+    {
+        private double storedPermits;
+        private double maxPermits;
+        private double stableIntervalMicros;
+        private long nextFreeTicketMicros;
+        private final double maxBurstSeconds;
+
+        private SmoothRateLimiter(double maxBurstSeconds)
+        {
+            this.nextFreeTicketMicros = 0L;
+            this.maxBurstSeconds = maxBurstSeconds;
+        }
+        
+        public double getStoredPermits()
+        {
+            return storedPermits;
+        }
+
+        public void doSetRate(double permitsPerSecond, long nowMicros)
+        {
+            resync(nowMicros);
+            this.stableIntervalMicros = (double) TimeUnit.SECONDS.toMicros(1L) / permitsPerSecond;
+            doSetRate(permitsPerSecond);
+        }
+
+        private void doSetRate(double permitsPerSecond)
+        {
+            double oldMaxPermits = this.maxPermits;
+            this.maxPermits = this.maxBurstSeconds * permitsPerSecond;
+            this.storedPermits = oldMaxPermits == 0.0D ? 0.0D : this.storedPermits * this.maxPermits / oldMaxPermits;
+        }
+
+        final double getRate()
+        {
+            return (double) TimeUnit.SECONDS.toMicros(1L) / this.stableIntervalMicros;
+        }
+
+        long waitTimeMicros(long nowMicros)
+        {
+            return Math.max(this.nextFreeTicketMicros - nowMicros, 0L);
+        }
+
+        boolean canAcquire(long nowMicros)
+        {
+            return nowMicros >= this.nextFreeTicketMicros;
+        }
+
+        public long reserveEarliestAvailable(int requiredPermits, long nowMicros)

Review comment:
       In our implementation here we remove the [storedPermitsToWaitTime](https://github.com/google/guava/blob/v30.1.1/guava/src/com/google/common/util/concurrent/SmoothRateLimiter.java#L360) method; I believe it's because of our intention to always [allow a single greedy overflow on permit allocation](https://github.com/google/guava/blob/88b1bdcb613de9f985cf3f873b7eec0a2e13741b/android/guava/src/com/google/common/util/concurrent/SmoothRateLimiter.java#L81-L119) before pausing the client channel.
   
   That said, might be good to both 1) point to where our inspiration for things came from, and 2) comment on how our imply differs in key ways, if that makes sense.

##########
File path: src/java/org/apache/cassandra/utils/NoWaitRateLimiter.java
##########
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils;
+
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.NotThreadSafe;
+import javax.annotation.concurrent.ThreadSafe;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.google.common.base.Ticker;
+import com.google.common.math.LongMath;
+
+@ThreadSafe
+@SuppressWarnings("UnstableApiUsage")
+public class NoWaitRateLimiter
+{
+    @GuardedBy("this")
+    private Stopwatch stopwatch;
+
+    @GuardedBy("this")
+    private SmoothRateLimiter delegate;
+
+    private NoWaitRateLimiter(SmoothRateLimiter delegate, Ticker ticker)
+    {
+        this.delegate = delegate;
+        this.stopwatch = Stopwatch.createStarted(ticker);
+    }
+    
+    public static NoWaitRateLimiter create(double permitsPerSecond)
+    {
+        return create(permitsPerSecond, Ticker.systemTicker());
+    }
+
+    public static NoWaitRateLimiter create(double permitsPerSecond, Ticker ticker)
+    {
+        return create(permitsPerSecond, 1.0D, ticker);
+    }
+
+    public static NoWaitRateLimiter create(double permitsPerSecond, double burstSeconds)
+    {
+        return create(permitsPerSecond, burstSeconds, Ticker.systemTicker());
+    }
+
+    public static NoWaitRateLimiter create(double permitsPerSecond, double burstSeconds, Ticker ticker)
+    {
+        SmoothRateLimiter delegate = new SmoothRateLimiter(burstSeconds);
+        NoWaitRateLimiter limiter = new NoWaitRateLimiter(delegate, ticker);
+        limiter.setRate(permitsPerSecond);
+        return limiter;
+    }
+    
+    @VisibleForTesting
+    public synchronized void reset(double permitsPerSecond, double burstSeconds, Ticker ticker)
+    {
+        this.stopwatch = Stopwatch.createStarted(ticker);
+        this.delegate = new SmoothRateLimiter(burstSeconds);
+        setRate(permitsPerSecond);
+    }
+
+    public final void setRate(double permitsPerSecond) 
+    {
+        Preconditions.checkArgument(permitsPerSecond > 0.0D && !Double.isNaN(permitsPerSecond), "rate must be positive");
+        
+        synchronized (this) 
+        {
+            delegate.doSetRate(permitsPerSecond, stopwatch.elapsed(TimeUnit.MICROSECONDS));
+        }
+    }
+
+    public synchronized double getRate() 
+    {
+        return delegate.getRate();
+    }
+
+    /**
+     * Forces the acquisition of a single permit.
+     * 
+     * @return microseconds until the acquired permit is available, and zero if it already is
+     */
+    public synchronized long reserveAndGetWaitLength() 
+    {
+        long nowMicros = this.stopwatch.elapsed(TimeUnit.MICROSECONDS);
+        long momentAvailable = delegate.reserveEarliestAvailable(1, nowMicros);
+        return Math.max(momentAvailable - nowMicros, 0L);
+    }
+
+    public synchronized boolean tryAcquire()
+    {
+        long nowMicros = this.stopwatch.elapsed(TimeUnit.MICROSECONDS);
+
+        if (!delegate.canAcquire(nowMicros)) 
+        {
+            return false;
+        }
+
+        delegate.reserveEarliestAvailable(1, nowMicros);
+            
+        return true;
+    }
+
+    public synchronized boolean canAcquire() 
+    {
+        long nowMicros = this.stopwatch.elapsed(TimeUnit.MICROSECONDS);
+        return delegate.canAcquire(nowMicros);
+    }
+
+    public synchronized long waitTimeMicros()
+    {
+        long nowMicros = this.stopwatch.elapsed(TimeUnit.MICROSECONDS);
+        return delegate.waitTimeMicros(nowMicros);
+    }
+
+    @VisibleForTesting
+    public synchronized long getNextFreeTicketMicros()
+    {
+        return delegate.nextFreeTicketMicros;
+    }
+
+    public synchronized double getStoredPermits()
+    {
+        return delegate.getStoredPermits();
+    }
+    
+    @Override
+    public String toString()
+    {
+        return String.format("Maximum requests/second is %.2f. %.2f stored permits available.", 
+                             getRate(), getStoredPermits());
+    }
+
+    @NotThreadSafe
+    private static class SmoothRateLimiter
+    {
+        private double storedPermits;
+        private double maxPermits;
+        private double stableIntervalMicros;
+        private long nextFreeTicketMicros;
+        private final double maxBurstSeconds;
+
+        private SmoothRateLimiter(double maxBurstSeconds)
+        {
+            this.nextFreeTicketMicros = 0L;
+            this.maxBurstSeconds = maxBurstSeconds;
+        }
+        
+        public double getStoredPermits()
+        {
+            return storedPermits;
+        }
+
+        public void doSetRate(double permitsPerSecond, long nowMicros)
+        {
+            resync(nowMicros);

Review comment:
       Having two signatures for doSetRate, one of which takes a timestamp and _mutates internal state of the timer_ vs. one that just recalculates stored permits, is pretty confusing to me and seems like it'd be error prone on future use. Just double-checked the guava one and they don't have the signature below (sans nowMicros). Recommend renaming this one to something that indicates it recalculates permits prior to calling doSetRate or renaming the other to indicate it just recalculates permits.

##########
File path: src/java/org/apache/cassandra/utils/NoWaitRateLimiter.java
##########
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils;
+
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.NotThreadSafe;
+import javax.annotation.concurrent.ThreadSafe;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.google.common.base.Ticker;
+import com.google.common.math.LongMath;
+
+@ThreadSafe
+@SuppressWarnings("UnstableApiUsage")
+public class NoWaitRateLimiter
+{
+    @GuardedBy("this")
+    private Stopwatch stopwatch;
+
+    @GuardedBy("this")
+    private SmoothRateLimiter delegate;
+
+    private NoWaitRateLimiter(SmoothRateLimiter delegate, Ticker ticker)
+    {
+        this.delegate = delegate;
+        this.stopwatch = Stopwatch.createStarted(ticker);
+    }
+    
+    public static NoWaitRateLimiter create(double permitsPerSecond)
+    {
+        return create(permitsPerSecond, Ticker.systemTicker());
+    }
+
+    public static NoWaitRateLimiter create(double permitsPerSecond, Ticker ticker)
+    {
+        return create(permitsPerSecond, 1.0D, ticker);
+    }
+
+    public static NoWaitRateLimiter create(double permitsPerSecond, double burstSeconds)
+    {
+        return create(permitsPerSecond, burstSeconds, Ticker.systemTicker());
+    }
+
+    public static NoWaitRateLimiter create(double permitsPerSecond, double burstSeconds, Ticker ticker)
+    {
+        SmoothRateLimiter delegate = new SmoothRateLimiter(burstSeconds);
+        NoWaitRateLimiter limiter = new NoWaitRateLimiter(delegate, ticker);
+        limiter.setRate(permitsPerSecond);
+        return limiter;
+    }
+    
+    @VisibleForTesting
+    public synchronized void reset(double permitsPerSecond, double burstSeconds, Ticker ticker)
+    {
+        this.stopwatch = Stopwatch.createStarted(ticker);
+        this.delegate = new SmoothRateLimiter(burstSeconds);
+        setRate(permitsPerSecond);
+    }
+
+    public final void setRate(double permitsPerSecond) 
+    {
+        Preconditions.checkArgument(permitsPerSecond > 0.0D && !Double.isNaN(permitsPerSecond), "rate must be positive");
+        
+        synchronized (this) 
+        {
+            delegate.doSetRate(permitsPerSecond, stopwatch.elapsed(TimeUnit.MICROSECONDS));
+        }
+    }
+
+    public synchronized double getRate() 
+    {
+        return delegate.getRate();
+    }
+
+    /**
+     * Forces the acquisition of a single permit.
+     * 
+     * @return microseconds until the acquired permit is available, and zero if it already is
+     */
+    public synchronized long reserveAndGetWaitLength() 
+    {
+        long nowMicros = this.stopwatch.elapsed(TimeUnit.MICROSECONDS);
+        long momentAvailable = delegate.reserveEarliestAvailable(1, nowMicros);
+        return Math.max(momentAvailable - nowMicros, 0L);
+    }
+
+    public synchronized boolean tryAcquire()
+    {
+        long nowMicros = this.stopwatch.elapsed(TimeUnit.MICROSECONDS);
+
+        if (!delegate.canAcquire(nowMicros)) 
+        {
+            return false;
+        }
+
+        delegate.reserveEarliestAvailable(1, nowMicros);
+            
+        return true;
+    }
+
+    public synchronized boolean canAcquire() 
+    {
+        long nowMicros = this.stopwatch.elapsed(TimeUnit.MICROSECONDS);
+        return delegate.canAcquire(nowMicros);
+    }
+
+    public synchronized long waitTimeMicros()
+    {
+        long nowMicros = this.stopwatch.elapsed(TimeUnit.MICROSECONDS);
+        return delegate.waitTimeMicros(nowMicros);
+    }
+
+    @VisibleForTesting
+    public synchronized long getNextFreeTicketMicros()
+    {
+        return delegate.nextFreeTicketMicros;
+    }
+
+    public synchronized double getStoredPermits()
+    {
+        return delegate.getStoredPermits();
+    }
+    
+    @Override
+    public String toString()
+    {
+        return String.format("Maximum requests/second is %.2f. %.2f stored permits available.", 
+                             getRate(), getStoredPermits());
+    }
+
+    @NotThreadSafe
+    private static class SmoothRateLimiter
+    {
+        private double storedPermits;
+        private double maxPermits;
+        private double stableIntervalMicros;
+        private long nextFreeTicketMicros;
+        private final double maxBurstSeconds;
+
+        private SmoothRateLimiter(double maxBurstSeconds)
+        {
+            this.nextFreeTicketMicros = 0L;
+            this.maxBurstSeconds = maxBurstSeconds;
+        }
+        
+        public double getStoredPermits()
+        {
+            return storedPermits;
+        }
+
+        public void doSetRate(double permitsPerSecond, long nowMicros)
+        {
+            resync(nowMicros);
+            this.stableIntervalMicros = (double) TimeUnit.SECONDS.toMicros(1L) / permitsPerSecond;
+            doSetRate(permitsPerSecond);
+        }
+
+        private void doSetRate(double permitsPerSecond)
+        {
+            double oldMaxPermits = this.maxPermits;
+            this.maxPermits = this.maxBurstSeconds * permitsPerSecond;
+            this.storedPermits = oldMaxPermits == 0.0D ? 0.0D : this.storedPermits * this.maxPermits / oldMaxPermits;
+        }
+
+        final double getRate()
+        {
+            return (double) TimeUnit.SECONDS.toMicros(1L) / this.stableIntervalMicros;
+        }
+
+        long waitTimeMicros(long nowMicros)
+        {
+            return Math.max(this.nextFreeTicketMicros - nowMicros, 0L);
+        }
+
+        boolean canAcquire(long nowMicros)
+        {
+            return nowMicros >= this.nextFreeTicketMicros;
+        }
+
+        public long reserveEarliestAvailable(int requiredPermits, long nowMicros)
+        {
+            resync(nowMicros);
+            long momentAvailableMicros = this.nextFreeTicketMicros;
+            double storedPermitsToSpend = Math.min(requiredPermits, this.storedPermits);
+            double freshPermits = (double) requiredPermits - storedPermitsToSpend;
+            long waitMicros = (long) (freshPermits * this.stableIntervalMicros);

Review comment:
       long-winded nit: Given freshPermits can basically be whatever arbitrary number gets passed in as requiredPermits that overflows storedPermitsToSpend, we don't really have anything in the call pipeline to prevent single large queries from heavily starving a single connection. While things are hard-coded to 1 for now, I see this as being something somewhat brittle in the pipeline for this API; might be worth either commenting on this at call sites, at method header here, or putting something in the code-base to indicate not just a max rate allowable per endpoint but perhaps a max permit request size and cap on that?
   
   I think that'd be a YAGNI violation to put in today, but at least commenting to that effect might help if someone else is working on this implementation in the future.

##########
File path: test/burn/org/apache/cassandra/transport/SimpleClientPerfTest.java
##########
@@ -190,7 +199,9 @@ public int encodedSize(QueryMessage queryMessage, ProtocolVersion version)
         AtomicBoolean measure = new AtomicBoolean(false);
         DescriptiveStatistics stats = new DescriptiveStatistics();
         Lock lock = new ReentrantLock();
-
+        RateLimiter limiter = RateLimiter.create(2000);

Review comment:
       If we're testing perf here, should we be using the NoWaitRateLimiter instead of the guava RateLimiter?

##########
File path: test/unit/org/apache/cassandra/utils/NoWaitRateLimiterTest.java
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.base.Ticker;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class NoWaitRateLimiterTest
+{
+    @Test
+    public void shouldProperlyInitializeRate()
+    {
+        NoWaitRateLimiter limiter = NoWaitRateLimiter.create(1000.0);
+        assertEquals(1000.0, limiter.getRate(), 0.0);
+    }
+
+    @SuppressWarnings("UnstableApiUsage")
+    @Test
+    public void shouldAdvanceWaitTime()
+    {
+        Ticker ticker = new Ticker()
+        {
+            @Override
+            public long read()
+            {
+                return 0;
+            }
+        };
+
+        double permitsPerSecond = 1000.0;
+        NoWaitRateLimiter limiter = NoWaitRateLimiter.create(permitsPerSecond, 0.0, ticker);
+        assertEquals(0.0, limiter.waitTimeMicros(), 0.0);
+        limiter.reserveAndGetWaitLength();
+        assertEquals(permitsPerSecond, limiter.waitTimeMicros(), 0.0);
+    }
+
+    @SuppressWarnings("UnstableApiUsage")
+    @Test
+    public void shouldAdvanceNextFreeTicketMicrosWithZeroBurst()
+    {
+        double permitsPerSecond = 1000.0;
+        
+        Ticker ticker = new Ticker()
+        {
+            @Override
+            public long read()
+            {
+                return 0;
+            }
+        };
+        
+        NoWaitRateLimiter limiter = NoWaitRateLimiter.create(permitsPerSecond, 0.0, ticker);
+        long start = limiter.getNextFreeTicketMicros();
+        limiter.reserveAndGetWaitLength();
+        assertEquals(permitsPerSecond, limiter.getNextFreeTicketMicros() - start, 0.0);
+    }
+
+    @SuppressWarnings("UnstableApiUsage")
+    @Test
+    public void shouldNotAcquireWhenPermitsExhaustedAndBeforeNextTicket()
+    {
+        double permitsPerSecond = 1000.0;
+        final AtomicLong tick = new AtomicLong(0);
+
+        Ticker ticker = new Ticker()
+        {
+            @Override
+            public long read()
+            {
+                return tick.get();
+            }
+        };
+
+        NoWaitRateLimiter limiter = NoWaitRateLimiter.create(permitsPerSecond, 0.0, ticker);
+        assertTrue(limiter.tryAcquire());
+        assertFalse(limiter.canAcquire());
+        
+        // Advance the clock to the next ticket time, and verify we can acquire again:
+        tick.addAndGet(TimeUnit.MICROSECONDS.toNanos((long) permitsPerSecond));
+        assertTrue(limiter.tryAcquire());
+        assertFalse(limiter.canAcquire());
+    }
+
+    @SuppressWarnings("UnstableApiUsage")
+    @Test
+    public void shouldNotAcquireAfterConumingAllStoredPermits()

Review comment:
       nit: spelling. Should be `shouldNotAcquireAfterConsumingAllStoredPermits`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org