You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/08/03 17:23:05 UTC

[GitHub] [kafka] dajac opened a new pull request #9114: KAFKA-10162; Make the rate based quota behave more like a Token Bucket (KIP-599, Part III)

dajac opened a new pull request #9114:
URL: https://github.com/apache/kafka/pull/9114


   Based on the discussion in https://github.com/apache/kafka/pull/9072, I have put together an alternative way. This one does the following:
   * Instead of changing the implementation of the Rate to behave like a Token Bucket, it actually use two different metrics: the regular Rate and a new Token Bucket. The latter is used to enforce the quota.
   
   The code can be improved and refactored. I just wanted to get out quickly to get feedback about the approach.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] apovzner commented on a change in pull request #9114: KAFKA-10162; Use Token Bucket algorithm for controller mutation quota (KIP-599, Part III)

Posted by GitBox <gi...@apache.org>.
apovzner commented on a change in pull request #9114:
URL: https://github.com/apache/kafka/pull/9114#discussion_r464807467



##########
File path: clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
##########
@@ -223,8 +229,14 @@ public void checkQuotas(long timeMs) {
                 Quota quota = config.quota();
                 if (quota != null) {
                     double value = metric.measurableValue(timeMs);
-                    if (!quota.acceptable(value)) {
-                        throw new QuotaViolationException(metric, value, quota.bound());
+                    if (metric.measurable() instanceof TokenBucket) {
+                        if (value <= 0) {

Review comment:
       we probably just want `value < 0` check here, right? Otherwise the throttle time will be 0 anyways.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #9114: KAFKA-10162; Use Token Bucket algorithm for controller mutation quota (KIP-599, Part III)

Posted by GitBox <gi...@apache.org>.
dajac commented on pull request #9114:
URL: https://github.com/apache/kafka/pull/9114#issuecomment-669977494


   @junrao I have updated the KIP and posted an update to the thread in the mailing list.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] junrao commented on pull request #9114: KAFKA-10162; Use Token Bucket algorithm for controller mutation quota (KIP-599, Part III)

Posted by GitBox <gi...@apache.org>.
junrao commented on pull request #9114:
URL: https://github.com/apache/kafka/pull/9114#issuecomment-669500012


   test this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] junrao commented on a change in pull request #9114: KAFKA-10162; Use Token Bucket algorithm for controller mutation quota (KIP-599, Part III)

Posted by GitBox <gi...@apache.org>.
junrao commented on a change in pull request #9114:
URL: https://github.com/apache/kafka/pull/9114#discussion_r465921001



##########
File path: clients/src/test/java/org/apache/kafka/common/metrics/TokenBucketTest.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.kafka.common.metrics.stats.TokenBucket;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TokenBucketTest {
+    Time time;
+
+    @Before
+    public void setup() {
+        time = new MockTime(0, System.currentTimeMillis(), System.nanoTime());
+    }
+
+    @Test
+    public void testRecord() {
+        // Rate  = 5 unit / sec
+        // Burst = 2 * (11 - 1) = 20 units
+        MetricConfig config = new MetricConfig()
+            .quota(Quota.upperBound(5))
+            .timeWindow(2, TimeUnit.SECONDS)
+            .samples(10);
+
+        TokenBucket tk = new TokenBucket();
+
+        // Expect 100 credits at T
+        assertEquals(100, tk.measure(config, time.milliseconds()), 0.1);
+
+        // Record 60 at T, expect 13 credits
+        tk.record(config, 60, time.milliseconds());
+        assertEquals(40, tk.measure(config, time.milliseconds()), 0.1);
+
+        // Advance by 2s, record 5, expect 45 credits
+        time.sleep(2000);
+        tk.record(config, 5, time.milliseconds());
+        assertEquals(45, tk.measure(config, time.milliseconds()), 0.1);
+
+        // Advance by 2s, record 60, expect -5 credits
+        time.sleep(2000);
+        tk.record(config, 60, time.milliseconds());
+        assertEquals(-5, tk.measure(config, time.milliseconds()), 0.1);
+    }
+
+    @Test
+    public void testUnrecord() {
+        // Rate  = 5 unit / sec
+        // Burst = 2 * (11 - 1) = 20 units

Review comment:
       we have 10 samples now.

##########
File path: clients/src/main/java/org/apache/kafka/common/metrics/stats/TokenBucket.java
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics.stats;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.kafka.common.metrics.MeasurableStat;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Quota;
+
+import static org.apache.kafka.common.metrics.internals.MetricsUtils.convert;
+
+/**
+ * The {@link TokenBucket} is a {@link MeasurableStat} implementing a token bucket algorithm
+ * that is usable within a {@link org.apache.kafka.common.metrics.Sensor}.
+ *
+ * The {@link Quota#bound()} defined the refill rate of the bucket while the maximum burst or
+ * the maximum number of credits of the bucket is defined by
+ * {@link MetricConfig#samples() * MetricConfig#timeWindowMs() * Quota#bound()}.
+ *
+ * The quota is considered as exhausted when the amount of remaining credits in the bucket

Review comment:
       Could we document how this quota behaves differently from existing quota?

##########
File path: clients/src/test/java/org/apache/kafka/common/metrics/TokenBucketTest.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.kafka.common.metrics.stats.TokenBucket;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TokenBucketTest {
+    Time time;
+
+    @Before
+    public void setup() {
+        time = new MockTime(0, System.currentTimeMillis(), System.nanoTime());
+    }
+
+    @Test
+    public void testRecord() {
+        // Rate  = 5 unit / sec
+        // Burst = 2 * (11 - 1) = 20 units

Review comment:
       we have 10 samples now.

##########
File path: clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java
##########
@@ -209,4 +212,95 @@ public void shouldReturnPresenceOfMetrics() {
 
         assertThat(sensor.hasMetrics(), is(true));
     }
+
+    @Test
+    public void testStrictQuotaEnforcementWithRate() {
+        final Time time = new MockTime(0, System.currentTimeMillis(), 0);
+        final Metrics metrics = new Metrics(time);
+        final Sensor sensor = metrics.sensor("sensor", new MetricConfig()
+            .quota(Quota.upperBound(2))
+            .timeWindow(1, TimeUnit.SECONDS)
+            .samples(11));

Review comment:
       Is the test based on 10 samples?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] apovzner commented on a change in pull request #9114: KAFKA-10162; Use Token Bucket algorithm for controller mutation quota (KIP-599, Part III)

Posted by GitBox <gi...@apache.org>.
apovzner commented on a change in pull request #9114:
URL: https://github.com/apache/kafka/pull/9114#discussion_r464810412



##########
File path: clients/src/main/java/org/apache/kafka/common/metrics/stats/TokenBucket.java
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics.stats;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.kafka.common.metrics.MeasurableStat;
+import org.apache.kafka.common.metrics.MetricConfig;
+
+public class TokenBucket implements MeasurableStat {
+    private final TimeUnit unit;
+    private double credits;
+    private long lastUpdateMs;
+
+    public TokenBucket() {
+        this(TimeUnit.SECONDS);
+    }
+
+    public TokenBucket(TimeUnit unit) {
+        this.unit = unit;
+        this.credits = 0;
+        this.lastUpdateMs = 0;
+    }
+
+    @Override
+    public double measure(final MetricConfig config, final long timeMs) {
+        if (config.quota() == null)
+            return Long.MAX_VALUE;
+        final double quota = config.quota().bound();
+        final double burst = (config.samples() - 1) * convert(config.timeWindowMs()) * quota;
+        refill(quota, burst, timeMs);
+        return this.credits;
+    }
+
+    @Override
+    public void record(final MetricConfig config, final double value, final long timeMs) {
+        if (config.quota() == null)
+            return;
+        final double quota = config.quota().bound();
+        final double burst = (config.samples() - 1) * convert(config.timeWindowMs()) * quota;

Review comment:
       If we want the burst to be more similar to original behavior, it seems like this should be `#samples`. With the current implementation, we can do 1 unit of work in the oldest window and then accept a burst right at the end of the last (not full yet) sample. Which means that the max burst size is almost at #samples * quota (if sample = 1 sec, quota is in units/second). Does this sound right to you?
   
   Also, I think we should take into account `config.timeWindowMs`, because it could be something other than 1 second.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] junrao commented on pull request #9114: KAFKA-10162; Use Token Bucket algorithm for controller mutation quota (KIP-599, Part III)

Posted by GitBox <gi...@apache.org>.
junrao commented on pull request #9114:
URL: https://github.com/apache/kafka/pull/9114#issuecomment-669499409


   ok to test


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #9114: KAFKA-10162; Use Token Bucket algorithm for controller mutation quota (KIP-599, Part III)

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #9114:
URL: https://github.com/apache/kafka/pull/9114#discussion_r465038614



##########
File path: core/src/main/scala/kafka/server/ControllerMutationQuotaManager.scala
##########
@@ -131,6 +133,16 @@ class PermissiveControllerMutationQuota(private val time: Time,
 
 object ControllerMutationQuotaManager {
   val QuotaControllerMutationDefault = Int.MaxValue.toDouble
+
+  def throttleTime(e: QuotaViolationException, timeMs: Long): Long = {
+    e.metric().measurable() match {
+      case _: TokenBucket => Math.round(-e.value() * e.bound())

Review comment:
       Yes, that's right.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] junrao commented on pull request #9114: KAFKA-10162; Use Token Bucket algorithm for controller mutation quota (KIP-599, Part III)

Posted by GitBox <gi...@apache.org>.
junrao commented on pull request #9114:
URL: https://github.com/apache/kafka/pull/9114#issuecomment-669378432


   ok to test


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] junrao commented on a change in pull request #9114: KAFKA-10162; Use Token Bucket algorithm for controller mutation quota (KIP-599, Part III)

Posted by GitBox <gi...@apache.org>.
junrao commented on a change in pull request #9114:
URL: https://github.com/apache/kafka/pull/9114#discussion_r465310710



##########
File path: clients/src/main/java/org/apache/kafka/common/metrics/stats/TokenBucket.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics.stats;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.kafka.common.metrics.MeasurableStat;
+import org.apache.kafka.common.metrics.MetricConfig;
+
+public class TokenBucket implements MeasurableStat {

Review comment:
       Could we add a high level description of the class?

##########
File path: clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java
##########
@@ -209,4 +212,95 @@ public void shouldReturnPresenceOfMetrics() {
 
         assertThat(sensor.hasMetrics(), is(true));
     }
+
+    @Test
+    public void testStrictQuotaEnforcementWithRate() {
+        final Time time = new MockTime(0, System.currentTimeMillis(), 0);
+        final Metrics metrics = new Metrics(time);
+        final Sensor sensor = metrics.sensor("sensor", new MetricConfig()
+            .quota(Quota.upperBound(2))
+            .timeWindow(1, TimeUnit.SECONDS)
+            .samples(11));
+        final MetricName metricName = metrics.metricName("rate", "test-group");
+        assertTrue(sensor.add(metricName, new Rate()));
+        final KafkaMetric rateMetric = metrics.metric(metricName);
+
+        // Recording a first value at T+0 to bring the avg rate to 3 which is already
+        // above the quota.
+        strictRecord(sensor, 30, time.milliseconds());
+        assertEquals(3, rateMetric.measurableValue(time.milliseconds()), 0.1);
+
+        // Theoretically, we should wait 5s to bring back the avg rate to the define quota:
+        // ((30 / 10) - 2) / 2 * 10 = 5s
+        time.sleep(5000);
+
+        // But, recording a second value is rejected because the avg rate is still equal
+        // to 3 after 5s.
+        assertEquals(3, rateMetric.measurableValue(time.milliseconds()), 0.1);
+        assertThrows(QuotaViolationException.class, () -> strictRecord(sensor, 30, time.milliseconds()));
+
+        metrics.close();
+    }
+
+    @Test
+    public void testStrictQuotaEnforcementWithTokenBucket() {
+        final Time time = new MockTime(0, System.currentTimeMillis(), 0);
+        final Metrics metrics = new Metrics(time);
+        final Sensor sensor = metrics.sensor("sensor", new MetricConfig()
+            .quota(Quota.upperBound(2))
+            .timeWindow(1, TimeUnit.SECONDS)
+            .samples(11));
+        final MetricName metricName = metrics.metricName("credits", "test-group");
+        assertTrue(sensor.add(metricName, new TokenBucket()));
+        final KafkaMetric tkMetric = metrics.metric(metricName);
+
+        // Recording a first value at T+0 to bring the remaining credits below zero
+        strictRecord(sensor, 30, time.milliseconds());
+        assertEquals(-10, tkMetric.measurableValue(time.milliseconds()), 0.1);
+
+        // Theoretically, we should wait 5s to bring back the avg rate to the define quota:
+        // 10 / 2 = 5s
+        time.sleep(5000);
+
+        // Unlike the default rate based on a windowed sum, it works as expected.
+        assertEquals(0, tkMetric.measurableValue(time.milliseconds()), 0.1);
+        strictRecord(sensor, 30, time.milliseconds());
+        assertEquals(-30, tkMetric.measurableValue(time.milliseconds()), 0.1);
+
+        metrics.close();
+    }
+
+    private void strictRecord(Sensor sensor, double value, long timeMs) {
+        synchronized (sensor) {
+            sensor.checkQuotas(timeMs);

Review comment:
       sensor.record() always calls checkQuotas(). Why do we need to call it explicitly here?

##########
File path: clients/src/test/java/org/apache/kafka/common/metrics/TokenBucketTest.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.kafka.common.metrics.stats.TokenBucket;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TokenBucketTest {
+    Time time;
+
+    @Before
+    public void setup() {
+        time = new MockTime(0, System.currentTimeMillis(), System.nanoTime());
+    }
+
+    @Test
+    public void testRecord() {
+        // Rate  = 5 unit / sec
+        // Burst = 2 * (11 - 1) = 20 units
+        MetricConfig config = new MetricConfig()
+            .quota(Quota.upperBound(5))
+            .timeWindow(2, TimeUnit.SECONDS)
+            .samples(11);
+
+        TokenBucket tk = new TokenBucket();
+
+        // Expect 100 credits at T
+        assertEquals(100, tk.measure(config, time.milliseconds()), 0.1);
+
+        // Record 60 at T, expect 13 credits
+        tk.record(config, 60, time.milliseconds());
+        assertEquals(40, tk.measure(config, time.milliseconds()), 0.1);
+
+        // Advance by 2s, record 5, expect 45 credits
+        time.sleep(2000);
+        tk.record(config, 5, time.milliseconds());
+        assertEquals(45, tk.measure(config, time.milliseconds()), 0.1);
+
+        // Advance by 2s, record 60, expect -5 credits
+        time.sleep(2000);
+        tk.record(config, 60, time.milliseconds());
+        assertEquals(-5, tk.measure(config, time.milliseconds()), 0.1);
+    }
+
+    @Test
+    public void testUnrecord() {
+        // Rate  = 5 unit / sec
+        // Burst = 2 * (11 - 1) = 20 units
+        MetricConfig config = new MetricConfig()
+            .quota(Quota.upperBound(5))
+            .timeWindow(2, TimeUnit.SECONDS)
+            .samples(11);
+
+        TokenBucket tk = new TokenBucket();
+
+        // Expect 100 credits at T
+        assertEquals(100, tk.measure(config, time.milliseconds()), 0.1);
+
+        // Record 60 at T, expect 100 credits

Review comment:
       We are recording -60.

##########
File path: core/src/main/scala/kafka/server/ControllerMutationQuotaManager.scala
##########
@@ -131,6 +133,22 @@ class PermissiveControllerMutationQuota(private val time: Time,
 
 object ControllerMutationQuotaManager {
   val QuotaControllerMutationDefault = Int.MaxValue.toDouble
+
+  /**
+   * This calculates the amount of time needed to bring the TokenBucket within quota
+   * assuming that no new metrics are recorded.
+   *
+   * Basically, if a value < 0 is observed, the time required to bring it to zero is
+   * -value / refill rate (quota bound) * 1000.
+   */
+  def throttleTime(e: QuotaViolationException, timeMs: Long): Long = {

Review comment:
       throttleTime => throttleTimeMs ?

##########
File path: clients/src/main/java/org/apache/kafka/common/metrics/stats/TokenBucket.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics.stats;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.kafka.common.metrics.MeasurableStat;
+import org.apache.kafka.common.metrics.MetricConfig;
+
+public class TokenBucket implements MeasurableStat {
+    private final TimeUnit unit;
+    private double tokens;
+    private long lastUpdateMs;
+
+    public TokenBucket() {
+        this(TimeUnit.SECONDS);
+    }
+
+    public TokenBucket(TimeUnit unit) {
+        this.unit = unit;
+        this.tokens = 0;
+        this.lastUpdateMs = 0;
+    }
+
+    @Override
+    public double measure(final MetricConfig config, final long timeMs) {
+        if (config.quota() == null)
+            return Long.MAX_VALUE;
+        final double quota = config.quota().bound();
+        final double burst = burst(config);
+        refill(quota, burst, timeMs);
+        return this.tokens;
+    }
+
+    @Override
+    public void record(final MetricConfig config, final double value, final long timeMs) {
+        if (config.quota() == null)
+            return;
+        final double quota = config.quota().bound();
+        final double burst = burst(config);
+        refill(quota, burst, timeMs);
+        this.tokens = Math.min(burst, this.tokens - value);
+    }
+
+    private void refill(final double quota, final double burst, final long timeMs) {
+        this.tokens = Math.min(burst, this.tokens + quota * convert(timeMs - lastUpdateMs));
+        this.lastUpdateMs = timeMs;
+    }
+
+    private double burst(final MetricConfig config) {
+        return (config.samples() - 1) * convert(config.timeWindowMs()) * config.quota().bound();
+    }
+
+    private double convert(final long timeMs) {
+        switch (unit) {

Review comment:
       This code is duplicated from Rate. Could we reuse it somehow?

##########
File path: clients/src/main/java/org/apache/kafka/common/metrics/stats/TokenBucket.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics.stats;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.kafka.common.metrics.MeasurableStat;
+import org.apache.kafka.common.metrics.MetricConfig;
+
+public class TokenBucket implements MeasurableStat {
+    private final TimeUnit unit;
+    private double tokens;
+    private long lastUpdateMs;
+
+    public TokenBucket() {
+        this(TimeUnit.SECONDS);
+    }
+
+    public TokenBucket(TimeUnit unit) {
+        this.unit = unit;
+        this.tokens = 0;
+        this.lastUpdateMs = 0;
+    }
+
+    @Override
+    public double measure(final MetricConfig config, final long timeMs) {
+        if (config.quota() == null)
+            return Long.MAX_VALUE;
+        final double quota = config.quota().bound();
+        final double burst = burst(config);
+        refill(quota, burst, timeMs);
+        return this.tokens;
+    }
+
+    @Override
+    public void record(final MetricConfig config, final double value, final long timeMs) {
+        if (config.quota() == null)
+            return;
+        final double quota = config.quota().bound();
+        final double burst = burst(config);
+        refill(quota, burst, timeMs);
+        this.tokens = Math.min(burst, this.tokens - value);
+    }
+
+    private void refill(final double quota, final double burst, final long timeMs) {
+        this.tokens = Math.min(burst, this.tokens + quota * convert(timeMs - lastUpdateMs));
+        this.lastUpdateMs = timeMs;
+    }
+
+    private double burst(final MetricConfig config) {
+        return (config.samples() - 1) * convert(config.timeWindowMs()) * config.quota().bound();

Review comment:
       Rate actually allows the windowSize to be close to the full samples * perSampleWindow. The logic around `config.samples() - 1` is just to make sure the windowSize contains at least that many full windows. So, to match that behavior, it seems that burst should use `config.samples()`.

##########
File path: core/src/main/scala/kafka/server/ControllerMutationQuotaManager.scala
##########
@@ -156,6 +174,24 @@ class ControllerMutationQuotaManager(private val config: ClientQuotaManagerConfi
       quotaMetricTags.asJava)
   }
 
+  protected def clientTokenBucketMetricName(quotaMetricTags: Map[String, String]): MetricName = {

Review comment:
       Could this be private?

##########
File path: clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java
##########
@@ -209,4 +212,95 @@ public void shouldReturnPresenceOfMetrics() {
 
         assertThat(sensor.hasMetrics(), is(true));
     }
+
+    @Test
+    public void testStrictQuotaEnforcementWithRate() {
+        final Time time = new MockTime(0, System.currentTimeMillis(), 0);
+        final Metrics metrics = new Metrics(time);
+        final Sensor sensor = metrics.sensor("sensor", new MetricConfig()
+            .quota(Quota.upperBound(2))
+            .timeWindow(1, TimeUnit.SECONDS)
+            .samples(11));
+        final MetricName metricName = metrics.metricName("rate", "test-group");
+        assertTrue(sensor.add(metricName, new Rate()));
+        final KafkaMetric rateMetric = metrics.metric(metricName);
+
+        // Recording a first value at T+0 to bring the avg rate to 3 which is already
+        // above the quota.
+        strictRecord(sensor, 30, time.milliseconds());
+        assertEquals(3, rateMetric.measurableValue(time.milliseconds()), 0.1);
+
+        // Theoretically, we should wait 5s to bring back the avg rate to the define quota:
+        // ((30 / 10) - 2) / 2 * 10 = 5s
+        time.sleep(5000);
+
+        // But, recording a second value is rejected because the avg rate is still equal
+        // to 3 after 5s.
+        assertEquals(3, rateMetric.measurableValue(time.milliseconds()), 0.1);
+        assertThrows(QuotaViolationException.class, () -> strictRecord(sensor, 30, time.milliseconds()));
+
+        metrics.close();
+    }
+
+    @Test
+    public void testStrictQuotaEnforcementWithTokenBucket() {
+        final Time time = new MockTime(0, System.currentTimeMillis(), 0);
+        final Metrics metrics = new Metrics(time);
+        final Sensor sensor = metrics.sensor("sensor", new MetricConfig()
+            .quota(Quota.upperBound(2))
+            .timeWindow(1, TimeUnit.SECONDS)
+            .samples(11));
+        final MetricName metricName = metrics.metricName("credits", "test-group");
+        assertTrue(sensor.add(metricName, new TokenBucket()));
+        final KafkaMetric tkMetric = metrics.metric(metricName);
+
+        // Recording a first value at T+0 to bring the remaining credits below zero
+        strictRecord(sensor, 30, time.milliseconds());
+        assertEquals(-10, tkMetric.measurableValue(time.milliseconds()), 0.1);
+
+        // Theoretically, we should wait 5s to bring back the avg rate to the define quota:
+        // 10 / 2 = 5s
+        time.sleep(5000);
+
+        // Unlike the default rate based on a windowed sum, it works as expected.
+        assertEquals(0, tkMetric.measurableValue(time.milliseconds()), 0.1);
+        strictRecord(sensor, 30, time.milliseconds());
+        assertEquals(-30, tkMetric.measurableValue(time.milliseconds()), 0.1);
+
+        metrics.close();
+    }
+
+    private void strictRecord(Sensor sensor, double value, long timeMs) {
+        synchronized (sensor) {
+            sensor.checkQuotas(timeMs);
+            sensor.record(value, timeMs, false);
+        }
+    }
+
+    @Test
+    public void testRecordAndCheckQuotaUseMetricConfigOfEachStat() {
+        final Time time = new MockTime(0, System.currentTimeMillis(), 0);
+        final Metrics metrics = new Metrics(time);
+        final Sensor sensor = metrics.sensor("sensor");
+
+        final MeasurableStat stat1 = Mockito.mock(MeasurableStat.class);
+        final MetricName stat1Name = metrics.metricName("stat1", "test-group");
+        final MetricConfig stat1Config = new MetricConfig().quota(Quota.upperBound(5));
+        sensor.add(stat1Name, stat1, stat1Config);
+
+        final MeasurableStat stat2 = Mockito.mock(MeasurableStat.class);
+        final MetricName stat2Name = metrics.metricName("stat2", "test-group");
+        final MetricConfig stat2Config = new MetricConfig().quota(Quota.upperBound(10));
+        sensor.add(stat2Name, stat2, stat2Config);
+
+        sensor.record(10, 1);
+        Mockito.verify(stat1).record(stat1Config, 10, 1);
+        Mockito.verify(stat2).record(stat2Config, 10, 1);
+
+        Mockito.when(stat1.measure(stat1Config, 2)).thenReturn(2.0);
+        Mockito.when(stat2.measure(stat2Config, 2)).thenReturn(2.0);
+        sensor.checkQuotas(2);

Review comment:
       Here, we are just verifying there is no quota exception?

##########
File path: clients/src/test/java/org/apache/kafka/common/metrics/TokenBucketTest.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.kafka.common.metrics.stats.TokenBucket;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TokenBucketTest {
+    Time time;
+
+    @Before
+    public void setup() {
+        time = new MockTime(0, System.currentTimeMillis(), System.nanoTime());
+    }
+
+    @Test
+    public void testRecord() {
+        // Rate  = 5 unit / sec
+        // Burst = 2 * (11 - 1) = 20 units
+        MetricConfig config = new MetricConfig()
+            .quota(Quota.upperBound(5))
+            .timeWindow(2, TimeUnit.SECONDS)
+            .samples(11);
+
+        TokenBucket tk = new TokenBucket();
+
+        // Expect 100 credits at T
+        assertEquals(100, tk.measure(config, time.milliseconds()), 0.1);
+
+        // Record 60 at T, expect 13 credits
+        tk.record(config, 60, time.milliseconds());
+        assertEquals(40, tk.measure(config, time.milliseconds()), 0.1);
+
+        // Advance by 2s, record 5, expect 45 credits
+        time.sleep(2000);
+        tk.record(config, 5, time.milliseconds());
+        assertEquals(45, tk.measure(config, time.milliseconds()), 0.1);
+
+        // Advance by 2s, record 60, expect -5 credits
+        time.sleep(2000);
+        tk.record(config, 60, time.milliseconds());
+        assertEquals(-5, tk.measure(config, time.milliseconds()), 0.1);
+    }
+
+    @Test
+    public void testUnrecord() {
+        // Rate  = 5 unit / sec
+        // Burst = 2 * (11 - 1) = 20 units
+        MetricConfig config = new MetricConfig()
+            .quota(Quota.upperBound(5))
+            .timeWindow(2, TimeUnit.SECONDS)
+            .samples(11);
+
+        TokenBucket tk = new TokenBucket();
+
+        // Expect 100 credits at T
+        assertEquals(100, tk.measure(config, time.milliseconds()), 0.1);
+
+        // Record 60 at T, expect 100 credits
+        tk.record(config, -60, time.milliseconds());
+        assertEquals(100, tk.measure(config, time.milliseconds()), 0.1);
+
+        // Advance by 2s, record 60, expect 40 credits
+        time.sleep(2000);
+        tk.record(config, 60, time.milliseconds());
+        assertEquals(40, tk.measure(config, time.milliseconds()), 0.1);
+
+        // Advance by 2s, record 60, expect 100 credits

Review comment:
       We are recording -60.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #9114: KAFKA-10162; Use Token Bucket algorithm for controller mutation quota (KIP-599, Part III)

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #9114:
URL: https://github.com/apache/kafka/pull/9114#discussion_r465889286



##########
File path: core/src/main/scala/kafka/server/ControllerMutationQuotaManager.scala
##########
@@ -156,6 +174,24 @@ class ControllerMutationQuotaManager(private val config: ClientQuotaManagerConfi
       quotaMetricTags.asJava)
   }
 
+  protected def clientTokenBucketMetricName(quotaMetricTags: Map[String, String]): MetricName = {

Review comment:
       Yes.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #9114: KAFKA-10162; Use Token Bucket algorithm for controller mutation quota (KIP-599, Part III)

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #9114:
URL: https://github.com/apache/kafka/pull/9114#discussion_r465886919



##########
File path: clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java
##########
@@ -209,4 +212,95 @@ public void shouldReturnPresenceOfMetrics() {
 
         assertThat(sensor.hasMetrics(), is(true));
     }
+
+    @Test
+    public void testStrictQuotaEnforcementWithRate() {
+        final Time time = new MockTime(0, System.currentTimeMillis(), 0);
+        final Metrics metrics = new Metrics(time);
+        final Sensor sensor = metrics.sensor("sensor", new MetricConfig()
+            .quota(Quota.upperBound(2))
+            .timeWindow(1, TimeUnit.SECONDS)
+            .samples(11));
+        final MetricName metricName = metrics.metricName("rate", "test-group");
+        assertTrue(sensor.add(metricName, new Rate()));
+        final KafkaMetric rateMetric = metrics.metric(metricName);
+
+        // Recording a first value at T+0 to bring the avg rate to 3 which is already
+        // above the quota.
+        strictRecord(sensor, 30, time.milliseconds());
+        assertEquals(3, rateMetric.measurableValue(time.milliseconds()), 0.1);
+
+        // Theoretically, we should wait 5s to bring back the avg rate to the define quota:
+        // ((30 / 10) - 2) / 2 * 10 = 5s
+        time.sleep(5000);
+
+        // But, recording a second value is rejected because the avg rate is still equal
+        // to 3 after 5s.
+        assertEquals(3, rateMetric.measurableValue(time.milliseconds()), 0.1);
+        assertThrows(QuotaViolationException.class, () -> strictRecord(sensor, 30, time.milliseconds()));
+
+        metrics.close();
+    }
+
+    @Test
+    public void testStrictQuotaEnforcementWithTokenBucket() {
+        final Time time = new MockTime(0, System.currentTimeMillis(), 0);
+        final Metrics metrics = new Metrics(time);
+        final Sensor sensor = metrics.sensor("sensor", new MetricConfig()
+            .quota(Quota.upperBound(2))
+            .timeWindow(1, TimeUnit.SECONDS)
+            .samples(11));
+        final MetricName metricName = metrics.metricName("credits", "test-group");
+        assertTrue(sensor.add(metricName, new TokenBucket()));
+        final KafkaMetric tkMetric = metrics.metric(metricName);
+
+        // Recording a first value at T+0 to bring the remaining credits below zero
+        strictRecord(sensor, 30, time.milliseconds());
+        assertEquals(-10, tkMetric.measurableValue(time.milliseconds()), 0.1);
+
+        // Theoretically, we should wait 5s to bring back the avg rate to the define quota:
+        // 10 / 2 = 5s
+        time.sleep(5000);
+
+        // Unlike the default rate based on a windowed sum, it works as expected.
+        assertEquals(0, tkMetric.measurableValue(time.milliseconds()), 0.1);
+        strictRecord(sensor, 30, time.milliseconds());
+        assertEquals(-30, tkMetric.measurableValue(time.milliseconds()), 0.1);
+
+        metrics.close();
+    }
+
+    private void strictRecord(Sensor sensor, double value, long timeMs) {
+        synchronized (sensor) {
+            sensor.checkQuotas(timeMs);

Review comment:
       In the above two tests, I simulate a "strict quotas" in the sense that recording is not allowed if the quota is already violated. Therefore, I check it before recording the value.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] apovzner commented on a change in pull request #9114: KAFKA-10162; Use Token Bucket algorithm for controller mutation quota (KIP-599, Part III)

Posted by GitBox <gi...@apache.org>.
apovzner commented on a change in pull request #9114:
URL: https://github.com/apache/kafka/pull/9114#discussion_r466161719



##########
File path: clients/src/main/java/org/apache/kafka/common/metrics/stats/TokenBucket.java
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics.stats;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.kafka.common.metrics.MeasurableStat;
+import org.apache.kafka.common.metrics.MetricConfig;
+
+public class TokenBucket implements MeasurableStat {
+    private final TimeUnit unit;
+    private double credits;
+    private long lastUpdateMs;
+
+    public TokenBucket() {
+        this(TimeUnit.SECONDS);
+    }
+
+    public TokenBucket(TimeUnit unit) {
+        this.unit = unit;
+        this.credits = 0;

Review comment:
       ah right, `lastUpdateMs` will make sure that bucket would be full on the first `record()`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] apovzner commented on a change in pull request #9114: KAFKA-10162; Use Token Bucket algorithm for controller mutation quota (KIP-599, Part III)

Posted by GitBox <gi...@apache.org>.
apovzner commented on a change in pull request #9114:
URL: https://github.com/apache/kafka/pull/9114#discussion_r466162270



##########
File path: clients/src/main/java/org/apache/kafka/common/metrics/stats/TokenBucket.java
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics.stats;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.kafka.common.metrics.MeasurableStat;
+import org.apache.kafka.common.metrics.MetricConfig;
+
+public class TokenBucket implements MeasurableStat {
+    private final TimeUnit unit;
+    private double credits;
+    private long lastUpdateMs;
+
+    public TokenBucket() {
+        this(TimeUnit.SECONDS);
+    }
+
+    public TokenBucket(TimeUnit unit) {
+        this.unit = unit;
+        this.credits = 0;
+        this.lastUpdateMs = 0;
+    }
+
+    @Override
+    public double measure(final MetricConfig config, final long timeMs) {
+        if (config.quota() == null)
+            return Long.MAX_VALUE;
+        final double quota = config.quota().bound();
+        final double burst = (config.samples() - 1) * convert(config.timeWindowMs()) * quota;
+        refill(quota, burst, timeMs);
+        return this.credits;
+    }
+
+    @Override
+    public void record(final MetricConfig config, final double value, final long timeMs) {
+        if (config.quota() == null)
+            return;
+        final double quota = config.quota().bound();
+        final double burst = (config.samples() - 1) * convert(config.timeWindowMs()) * quota;

Review comment:
       hm.. maybe I just did not notice `config.timeWindowMs`. All good now.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #9114: KAFKA-10162; Use Token Bucket algorithm for controller mutation quota (KIP-599, Part III)

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #9114:
URL: https://github.com/apache/kafka/pull/9114#discussion_r465968369



##########
File path: clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java
##########
@@ -209,4 +212,95 @@ public void shouldReturnPresenceOfMetrics() {
 
         assertThat(sensor.hasMetrics(), is(true));
     }
+
+    @Test
+    public void testStrictQuotaEnforcementWithRate() {
+        final Time time = new MockTime(0, System.currentTimeMillis(), 0);
+        final Metrics metrics = new Metrics(time);
+        final Sensor sensor = metrics.sensor("sensor", new MetricConfig()
+            .quota(Quota.upperBound(2))
+            .timeWindow(1, TimeUnit.SECONDS)
+            .samples(11));

Review comment:
       The config is correct. 11 samples. With the few samples in the test, the total window is actually 10s. This is why I use 10 in the formulas.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #9114: KAFKA-10162; Use Token Bucket algorithm for controller mutation quota (KIP-599, Part III)

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #9114:
URL: https://github.com/apache/kafka/pull/9114#discussion_r465889447



##########
File path: clients/src/main/java/org/apache/kafka/common/metrics/stats/TokenBucket.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics.stats;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.kafka.common.metrics.MeasurableStat;
+import org.apache.kafka.common.metrics.MetricConfig;
+
+public class TokenBucket implements MeasurableStat {
+    private final TimeUnit unit;
+    private double tokens;
+    private long lastUpdateMs;
+
+    public TokenBucket() {
+        this(TimeUnit.SECONDS);
+    }
+
+    public TokenBucket(TimeUnit unit) {
+        this.unit = unit;
+        this.tokens = 0;
+        this.lastUpdateMs = 0;
+    }
+
+    @Override
+    public double measure(final MetricConfig config, final long timeMs) {
+        if (config.quota() == null)
+            return Long.MAX_VALUE;
+        final double quota = config.quota().bound();
+        final double burst = burst(config);
+        refill(quota, burst, timeMs);
+        return this.tokens;
+    }
+
+    @Override
+    public void record(final MetricConfig config, final double value, final long timeMs) {
+        if (config.quota() == null)
+            return;
+        final double quota = config.quota().bound();
+        final double burst = burst(config);
+        refill(quota, burst, timeMs);
+        this.tokens = Math.min(burst, this.tokens - value);
+    }
+
+    private void refill(final double quota, final double burst, final long timeMs) {
+        this.tokens = Math.min(burst, this.tokens + quota * convert(timeMs - lastUpdateMs));
+        this.lastUpdateMs = timeMs;
+    }
+
+    private double burst(final MetricConfig config) {
+        return (config.samples() - 1) * convert(config.timeWindowMs()) * config.quota().bound();
+    }
+
+    private double convert(final long timeMs) {
+        switch (unit) {

Review comment:
       Sure, makes sense.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #9114: KAFKA-10162; Use Token Bucket algorithm for controller mutation quota (KIP-599, Part III)

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #9114:
URL: https://github.com/apache/kafka/pull/9114#discussion_r465040672



##########
File path: clients/src/main/java/org/apache/kafka/common/metrics/stats/TokenBucket.java
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics.stats;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.kafka.common.metrics.MeasurableStat;
+import org.apache.kafka.common.metrics.MetricConfig;
+
+public class TokenBucket implements MeasurableStat {
+    private final TimeUnit unit;
+    private double credits;
+    private long lastUpdateMs;
+
+    public TokenBucket() {
+        this(TimeUnit.SECONDS);
+    }
+
+    public TokenBucket(TimeUnit unit) {
+        this.unit = unit;
+        this.credits = 0;

Review comment:
       The burst is not know when the `TokenBucket` is constructed as we must get the `MetricConfig` at least once to initialise it correctly. Note that, it will be initialised to the maximum burst when we refill the bucket when either `measure` or `record` is called for the first time. That works because `lastUpdateMs` is `0` so the delta between `now` and it will make the bucket full. So, the bucket actually stats from the maximum burst.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #9114: KAFKA-10162; Use Token Bucket algorithm for controller mutation quota (KIP-599, Part III)

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #9114:
URL: https://github.com/apache/kafka/pull/9114#discussion_r465888817



##########
File path: clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java
##########
@@ -209,4 +212,95 @@ public void shouldReturnPresenceOfMetrics() {
 
         assertThat(sensor.hasMetrics(), is(true));
     }
+
+    @Test
+    public void testStrictQuotaEnforcementWithRate() {
+        final Time time = new MockTime(0, System.currentTimeMillis(), 0);
+        final Metrics metrics = new Metrics(time);
+        final Sensor sensor = metrics.sensor("sensor", new MetricConfig()
+            .quota(Quota.upperBound(2))
+            .timeWindow(1, TimeUnit.SECONDS)
+            .samples(11));
+        final MetricName metricName = metrics.metricName("rate", "test-group");
+        assertTrue(sensor.add(metricName, new Rate()));
+        final KafkaMetric rateMetric = metrics.metric(metricName);
+
+        // Recording a first value at T+0 to bring the avg rate to 3 which is already
+        // above the quota.
+        strictRecord(sensor, 30, time.milliseconds());
+        assertEquals(3, rateMetric.measurableValue(time.milliseconds()), 0.1);
+
+        // Theoretically, we should wait 5s to bring back the avg rate to the define quota:
+        // ((30 / 10) - 2) / 2 * 10 = 5s
+        time.sleep(5000);
+
+        // But, recording a second value is rejected because the avg rate is still equal
+        // to 3 after 5s.
+        assertEquals(3, rateMetric.measurableValue(time.milliseconds()), 0.1);
+        assertThrows(QuotaViolationException.class, () -> strictRecord(sensor, 30, time.milliseconds()));
+
+        metrics.close();
+    }
+
+    @Test
+    public void testStrictQuotaEnforcementWithTokenBucket() {
+        final Time time = new MockTime(0, System.currentTimeMillis(), 0);
+        final Metrics metrics = new Metrics(time);
+        final Sensor sensor = metrics.sensor("sensor", new MetricConfig()
+            .quota(Quota.upperBound(2))
+            .timeWindow(1, TimeUnit.SECONDS)
+            .samples(11));
+        final MetricName metricName = metrics.metricName("credits", "test-group");
+        assertTrue(sensor.add(metricName, new TokenBucket()));
+        final KafkaMetric tkMetric = metrics.metric(metricName);
+
+        // Recording a first value at T+0 to bring the remaining credits below zero
+        strictRecord(sensor, 30, time.milliseconds());
+        assertEquals(-10, tkMetric.measurableValue(time.milliseconds()), 0.1);
+
+        // Theoretically, we should wait 5s to bring back the avg rate to the define quota:
+        // 10 / 2 = 5s
+        time.sleep(5000);
+
+        // Unlike the default rate based on a windowed sum, it works as expected.
+        assertEquals(0, tkMetric.measurableValue(time.milliseconds()), 0.1);
+        strictRecord(sensor, 30, time.milliseconds());
+        assertEquals(-30, tkMetric.measurableValue(time.milliseconds()), 0.1);
+
+        metrics.close();
+    }
+
+    private void strictRecord(Sensor sensor, double value, long timeMs) {
+        synchronized (sensor) {
+            sensor.checkQuotas(timeMs);
+            sensor.record(value, timeMs, false);
+        }
+    }
+
+    @Test
+    public void testRecordAndCheckQuotaUseMetricConfigOfEachStat() {
+        final Time time = new MockTime(0, System.currentTimeMillis(), 0);
+        final Metrics metrics = new Metrics(time);
+        final Sensor sensor = metrics.sensor("sensor");
+
+        final MeasurableStat stat1 = Mockito.mock(MeasurableStat.class);
+        final MetricName stat1Name = metrics.metricName("stat1", "test-group");
+        final MetricConfig stat1Config = new MetricConfig().quota(Quota.upperBound(5));
+        sensor.add(stat1Name, stat1, stat1Config);
+
+        final MeasurableStat stat2 = Mockito.mock(MeasurableStat.class);
+        final MetricName stat2Name = metrics.metricName("stat2", "test-group");
+        final MetricConfig stat2Config = new MetricConfig().quota(Quota.upperBound(10));
+        sensor.add(stat2Name, stat2, stat2Config);
+
+        sensor.record(10, 1);
+        Mockito.verify(stat1).record(stat1Config, 10, 1);
+        Mockito.verify(stat2).record(stat2Config, 10, 1);
+
+        Mockito.when(stat1.measure(stat1Config, 2)).thenReturn(2.0);
+        Mockito.when(stat2.measure(stat2Config, 2)).thenReturn(2.0);
+        sensor.checkQuotas(2);

Review comment:
       No. This test actually reproduce a bug that I have found. Basically, a stat can be added to the Sensor with a MetricsConfig but the Sensor was not using the provided one when recording a value but was using the one of the Sensor all the time. This test verifies that the correct config is used both for recording and measuring via calling checkQuota. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #9114: KAFKA-10162; Use Token Bucket algorithm for controller mutation quota (KIP-599, Part III)

Posted by GitBox <gi...@apache.org>.
dajac commented on pull request #9114:
URL: https://github.com/apache/kafka/pull/9114#issuecomment-669343564


   @junrao Thanks for your comments. I just pushed an update which addressed them.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #9114: KAFKA-10162; Use Token Bucket algorithm for controller mutation quota (KIP-599, Part III)

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #9114:
URL: https://github.com/apache/kafka/pull/9114#discussion_r465038411



##########
File path: clients/src/main/java/org/apache/kafka/common/metrics/stats/TokenBucket.java
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics.stats;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.kafka.common.metrics.MeasurableStat;
+import org.apache.kafka.common.metrics.MetricConfig;
+
+public class TokenBucket implements MeasurableStat {
+    private final TimeUnit unit;
+    private double credits;
+    private long lastUpdateMs;
+
+    public TokenBucket() {
+        this(TimeUnit.SECONDS);
+    }
+
+    public TokenBucket(TimeUnit unit) {
+        this.unit = unit;
+        this.credits = 0;
+        this.lastUpdateMs = 0;
+    }
+
+    @Override
+    public double measure(final MetricConfig config, final long timeMs) {
+        if (config.quota() == null)
+            return Long.MAX_VALUE;
+        final double quota = config.quota().bound();
+        final double burst = (config.samples() - 1) * convert(config.timeWindowMs()) * quota;
+        refill(quota, burst, timeMs);
+        return this.credits;
+    }
+
+    @Override
+    public void record(final MetricConfig config, final double value, final long timeMs) {
+        if (config.quota() == null)
+            return;
+        final double quota = config.quota().bound();
+        final double burst = (config.samples() - 1) * convert(config.timeWindowMs()) * quota;

Review comment:
       That's a very good question. I use `#samples - 1` here because we do so in `Rate#windowSize` to compute the total window so it is probably closer like this.
   
   @apovzner `config.timeWindowMs` is already in the formula or did I misunderstand your comment?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #9114: KAFKA-10162; Use Token Bucket algorithm for controller mutation quota (KIP-599, Part III)

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #9114:
URL: https://github.com/apache/kafka/pull/9114#discussion_r465997624



##########
File path: clients/src/main/java/org/apache/kafka/common/metrics/stats/TokenBucket.java
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics.stats;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.kafka.common.metrics.MeasurableStat;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Quota;
+
+import static org.apache.kafka.common.metrics.internals.MetricsUtils.convert;
+
+/**
+ * The {@link TokenBucket} is a {@link MeasurableStat} implementing a token bucket algorithm
+ * that is usable within a {@link org.apache.kafka.common.metrics.Sensor}.
+ *
+ * The {@link Quota#bound()} defined the refill rate of the bucket while the maximum burst or
+ * the maximum number of credits of the bucket is defined by
+ * {@link MetricConfig#samples() * MetricConfig#timeWindowMs() * Quota#bound()}.
+ *
+ * The quota is considered as exhausted when the amount of remaining credits in the bucket
+ * is below zero. The enforcement is done by the {@link org.apache.kafka.common.metrics.Sensor}.
+ *
+ * Token Bucket vs Rate based Quota:
+ * The current sampled rate based quota does not cope well with bursty workloads. The issue is
+ * that a unique and large sample can hold the average above the quota and this until it is

Review comment:
       i have removed the "and this". Does it parse better?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] junrao commented on a change in pull request #9114: KAFKA-10162; Use Token Bucket algorithm for controller mutation quota (KIP-599, Part III)

Posted by GitBox <gi...@apache.org>.
junrao commented on a change in pull request #9114:
URL: https://github.com/apache/kafka/pull/9114#discussion_r465989800



##########
File path: clients/src/main/java/org/apache/kafka/common/metrics/stats/TokenBucket.java
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics.stats;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.kafka.common.metrics.MeasurableStat;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Quota;
+
+import static org.apache.kafka.common.metrics.internals.MetricsUtils.convert;
+
+/**
+ * The {@link TokenBucket} is a {@link MeasurableStat} implementing a token bucket algorithm
+ * that is usable within a {@link org.apache.kafka.common.metrics.Sensor}.
+ *
+ * The {@link Quota#bound()} defined the refill rate of the bucket while the maximum burst or
+ * the maximum number of credits of the bucket is defined by
+ * {@link MetricConfig#samples() * MetricConfig#timeWindowMs() * Quota#bound()}.
+ *
+ * The quota is considered as exhausted when the amount of remaining credits in the bucket
+ * is below zero. The enforcement is done by the {@link org.apache.kafka.common.metrics.Sensor}.
+ *
+ * Token Bucket vs Rate based Quota:
+ * The current sampled rate based quota does not cope well with bursty workloads. The issue is
+ * that a unique and large sample can hold the average above the quota and this until it is

Review comment:
       "this until it is" doesn't quite parse.

##########
File path: clients/src/main/java/org/apache/kafka/common/metrics/stats/TokenBucket.java
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics.stats;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.kafka.common.metrics.MeasurableStat;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Quota;
+
+import static org.apache.kafka.common.metrics.internals.MetricsUtils.convert;
+
+/**
+ * The {@link TokenBucket} is a {@link MeasurableStat} implementing a token bucket algorithm
+ * that is usable within a {@link org.apache.kafka.common.metrics.Sensor}.
+ *
+ * The {@link Quota#bound()} defined the refill rate of the bucket while the maximum burst or
+ * the maximum number of credits of the bucket is defined by
+ * {@link MetricConfig#samples() * MetricConfig#timeWindowMs() * Quota#bound()}.
+ *
+ * The quota is considered as exhausted when the amount of remaining credits in the bucket
+ * is below zero. The enforcement is done by the {@link org.apache.kafka.common.metrics.Sensor}.
+ *
+ * Token Bucket vs Rate based Quota:
+ * The current sampled rate based quota does not cope well with bursty workloads. The issue is
+ * that a unique and large sample can hold the average above the quota and this until it is
+ * discarded. Practically, when this happens, one must wait until the sample is expired to
+ * bring the rate below the quota even though less time would be theoretically required. As an
+ * examples, let's imagine that we have:
+ * - Quota (Q)   = 5
+ * - Samples (S) = 100
+ * - Window (W)  = 1s
+ * A burst of 560 brings the average rate (R) to 5.6 (560 / 100). The throttle time is computed as

Review comment:
       "The throttle time " : I guess this is the expected throttle time?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #9114: KAFKA-10162; Use Token Bucket algorithm for controller mutation quota (KIP-599, Part III)

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #9114:
URL: https://github.com/apache/kafka/pull/9114#discussion_r465040813



##########
File path: clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
##########
@@ -223,8 +229,14 @@ public void checkQuotas(long timeMs) {
                 Quota quota = config.quota();
                 if (quota != null) {
                     double value = metric.measurableValue(timeMs);
-                    if (!quota.acceptable(value)) {
-                        throw new QuotaViolationException(metric, value, quota.bound());
+                    if (metric.measurable() instanceof TokenBucket) {
+                        if (value <= 0) {

Review comment:
       Yes, that makes sense.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] junrao commented on pull request #9114: KAFKA-10162; Use Token Bucket algorithm for controller mutation quota (KIP-599, Part III)

Posted by GitBox <gi...@apache.org>.
junrao commented on pull request #9114:
URL: https://github.com/apache/kafka/pull/9114#issuecomment-669510023


   test this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] junrao commented on pull request #9114: KAFKA-10162; Use Token Bucket algorithm for controller mutation quota (KIP-599, Part III)

Posted by GitBox <gi...@apache.org>.
junrao commented on pull request #9114:
URL: https://github.com/apache/kafka/pull/9114#issuecomment-669622211


   The failed unit test seems unrelated.
   
   @apovzner : Do you have any other comments on this? Thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #9114: KAFKA-10162; Use Token Bucket algorithm for controller mutation quota (KIP-599, Part III)

Posted by GitBox <gi...@apache.org>.
dajac commented on pull request #9114:
URL: https://github.com/apache/kafka/pull/9114#issuecomment-669465274


   @junrao Thanks. I have updated the PR.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] junrao commented on pull request #9114: KAFKA-10162; Use Token Bucket algorithm for controller mutation quota (KIP-599, Part III)

Posted by GitBox <gi...@apache.org>.
junrao commented on pull request #9114:
URL: https://github.com/apache/kafka/pull/9114#issuecomment-669499767


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] apovzner commented on a change in pull request #9114: KAFKA-10162; Use Token Bucket algorithm for controller mutation quota (KIP-599, Part III)

Posted by GitBox <gi...@apache.org>.
apovzner commented on a change in pull request #9114:
URL: https://github.com/apache/kafka/pull/9114#discussion_r464811830



##########
File path: clients/src/main/java/org/apache/kafka/common/metrics/stats/TokenBucket.java
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics.stats;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.kafka.common.metrics.MeasurableStat;
+import org.apache.kafka.common.metrics.MetricConfig;
+
+public class TokenBucket implements MeasurableStat {
+    private final TimeUnit unit;
+    private double credits;
+    private long lastUpdateMs;
+
+    public TokenBucket() {
+        this(TimeUnit.SECONDS);
+    }
+
+    public TokenBucket(TimeUnit unit) {
+        this.unit = unit;
+        this.credits = 0;

Review comment:
       I also think it's better to start with burst size credits. We would normally create the sensor on the first byte/request/etc. and before that, the <user,client> is idle which means they were "accumulating credits".




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #9114: KAFKA-10162; Use Token Bucket algorithm for controller mutation quota (KIP-599, Part III)

Posted by GitBox <gi...@apache.org>.
dajac commented on pull request #9114:
URL: https://github.com/apache/kafka/pull/9114#issuecomment-668593472


   @junrao @apovzner I have updated the PR to address your comments. I have also refactored the code a bit and updated all the tests of the controller quota manager to work with the Token Bucket.
   
   From a public api perspective, we would have a new metric per user/clientId named: `tokens`. It represents the number of tokens in the bucket.
   
   Please, pay close attention to the unit tests of the controller mutation quota manager. Most of the throttle times have been updated there to match the token bucket usage. This is due to the fact that the token bucket does not have to wait until old samples are purged to get it amount of tokens updated. It is updated continuously.
   
   I wonder if we should use by default strait away of put it behind a config flag to let user opt in. It does not matter much for our new quota but that may matter if/when we migrate existing quotas. What do you think?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] junrao merged pull request #9114: KAFKA-10162; Use Token Bucket algorithm for controller mutation quota (KIP-599, Part III)

Posted by GitBox <gi...@apache.org>.
junrao merged pull request #9114:
URL: https://github.com/apache/kafka/pull/9114


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #9114: KAFKA-10162; Use Token Bucket algorithm for controller mutation quota (KIP-599, Part III)

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #9114:
URL: https://github.com/apache/kafka/pull/9114#discussion_r465997810



##########
File path: clients/src/main/java/org/apache/kafka/common/metrics/stats/TokenBucket.java
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics.stats;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.kafka.common.metrics.MeasurableStat;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Quota;
+
+import static org.apache.kafka.common.metrics.internals.MetricsUtils.convert;
+
+/**
+ * The {@link TokenBucket} is a {@link MeasurableStat} implementing a token bucket algorithm
+ * that is usable within a {@link org.apache.kafka.common.metrics.Sensor}.
+ *
+ * The {@link Quota#bound()} defined the refill rate of the bucket while the maximum burst or
+ * the maximum number of credits of the bucket is defined by
+ * {@link MetricConfig#samples() * MetricConfig#timeWindowMs() * Quota#bound()}.
+ *
+ * The quota is considered as exhausted when the amount of remaining credits in the bucket
+ * is below zero. The enforcement is done by the {@link org.apache.kafka.common.metrics.Sensor}.
+ *
+ * Token Bucket vs Rate based Quota:
+ * The current sampled rate based quota does not cope well with bursty workloads. The issue is
+ * that a unique and large sample can hold the average above the quota and this until it is
+ * discarded. Practically, when this happens, one must wait until the sample is expired to
+ * bring the rate below the quota even though less time would be theoretically required. As an
+ * examples, let's imagine that we have:
+ * - Quota (Q)   = 5
+ * - Samples (S) = 100
+ * - Window (W)  = 1s
+ * A burst of 560 brings the average rate (R) to 5.6 (560 / 100). The throttle time is computed as

Review comment:
       Yes, that is correct. I have added "expected" to be clearer.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] junrao commented on pull request #9114: KAFKA-10162; Use Token Bucket algorithm for controller mutation quota (KIP-599, Part III)

Posted by GitBox <gi...@apache.org>.
junrao commented on pull request #9114:
URL: https://github.com/apache/kafka/pull/9114#issuecomment-669509903


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] apovzner commented on pull request #9114: KAFKA-10162; Use Token Bucket algorithm for controller mutation quota (KIP-599, Part III)

Posted by GitBox <gi...@apache.org>.
apovzner commented on pull request #9114:
URL: https://github.com/apache/kafka/pull/9114#issuecomment-668399674


   I also took a quick look and I agree that this implementation is cleaner and definitely easier to understand.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] junrao commented on a change in pull request #9114: KAFKA-10162; Use Token Bucket algorithm for controller mutation quota (KIP-599, Part III)

Posted by GitBox <gi...@apache.org>.
junrao commented on a change in pull request #9114:
URL: https://github.com/apache/kafka/pull/9114#discussion_r464724281



##########
File path: clients/src/main/java/org/apache/kafka/common/metrics/stats/TokenBucket.java
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics.stats;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.kafka.common.metrics.MeasurableStat;
+import org.apache.kafka.common.metrics.MetricConfig;
+
+public class TokenBucket implements MeasurableStat {
+    private final TimeUnit unit;
+    private double credits;
+    private long lastUpdateMs;
+
+    public TokenBucket() {
+        this(TimeUnit.SECONDS);
+    }
+
+    public TokenBucket(TimeUnit unit) {
+        this.unit = unit;
+        this.credits = 0;
+        this.lastUpdateMs = 0;
+    }
+
+    @Override
+    public double measure(final MetricConfig config, final long timeMs) {
+        if (config.quota() == null)
+            return Long.MAX_VALUE;
+        final double quota = config.quota().bound();
+        final double burst = (config.samples() - 1) * convert(config.timeWindowMs()) * quota;
+        refill(quota, burst, timeMs);
+        return this.credits;
+    }
+
+    @Override
+    public void record(final MetricConfig config, final double value, final long timeMs) {
+        if (config.quota() == null)
+            return;
+        final double quota = config.quota().bound();
+        final double burst = (config.samples() - 1) * convert(config.timeWindowMs()) * quota;

Review comment:
       Should burst be computed from `#samples` or `#samples - 1` ?

##########
File path: clients/src/main/java/org/apache/kafka/common/metrics/stats/TokenBucket.java
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics.stats;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.kafka.common.metrics.MeasurableStat;
+import org.apache.kafka.common.metrics.MetricConfig;
+
+public class TokenBucket implements MeasurableStat {
+    private final TimeUnit unit;
+    private double credits;
+    private long lastUpdateMs;
+
+    public TokenBucket() {
+        this(TimeUnit.SECONDS);
+    }
+
+    public TokenBucket(TimeUnit unit) {
+        this.unit = unit;
+        this.credits = 0;

Review comment:
       Should we start with 0 credit or the full burst credits? The benefit of the latter is that during initialization, the requests won't be throttled as much due to a cold start.

##########
File path: core/src/main/scala/kafka/server/ControllerMutationQuotaManager.scala
##########
@@ -131,6 +133,16 @@ class PermissiveControllerMutationQuota(private val time: Time,
 
 object ControllerMutationQuotaManager {
   val QuotaControllerMutationDefault = Int.MaxValue.toDouble
+
+  def throttleTime(e: QuotaViolationException, timeMs: Long): Long = {
+    e.metric().measurable() match {
+      case _: TokenBucket => Math.round(-e.value() * e.bound())

Review comment:
       Hmm, assuming bound is the per sec rate, it seems that the throttleTimeMs should be `-e.value() / e.bound * 1000`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #9114: KAFKA-10162; Use Token Bucket algorithm for controller mutation quota (KIP-599, Part III)

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #9114:
URL: https://github.com/apache/kafka/pull/9114#discussion_r465889824



##########
File path: clients/src/main/java/org/apache/kafka/common/metrics/stats/TokenBucket.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics.stats;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.kafka.common.metrics.MeasurableStat;
+import org.apache.kafka.common.metrics.MetricConfig;
+
+public class TokenBucket implements MeasurableStat {
+    private final TimeUnit unit;
+    private double tokens;
+    private long lastUpdateMs;
+
+    public TokenBucket() {
+        this(TimeUnit.SECONDS);
+    }
+
+    public TokenBucket(TimeUnit unit) {
+        this.unit = unit;
+        this.tokens = 0;
+        this.lastUpdateMs = 0;
+    }
+
+    @Override
+    public double measure(final MetricConfig config, final long timeMs) {
+        if (config.quota() == null)
+            return Long.MAX_VALUE;
+        final double quota = config.quota().bound();
+        final double burst = burst(config);
+        refill(quota, burst, timeMs);
+        return this.tokens;
+    }
+
+    @Override
+    public void record(final MetricConfig config, final double value, final long timeMs) {
+        if (config.quota() == null)
+            return;
+        final double quota = config.quota().bound();
+        final double burst = burst(config);
+        refill(quota, burst, timeMs);
+        this.tokens = Math.min(burst, this.tokens - value);
+    }
+
+    private void refill(final double quota, final double burst, final long timeMs) {
+        this.tokens = Math.min(burst, this.tokens + quota * convert(timeMs - lastUpdateMs));
+        this.lastUpdateMs = timeMs;
+    }
+
+    private double burst(final MetricConfig config) {
+        return (config.samples() - 1) * convert(config.timeWindowMs()) * config.quota().bound();

Review comment:
       Ack. I misunderstood it. `config.samples()` sounds good to me.

##########
File path: clients/src/main/java/org/apache/kafka/common/metrics/stats/TokenBucket.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics.stats;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.kafka.common.metrics.MeasurableStat;
+import org.apache.kafka.common.metrics.MetricConfig;
+
+public class TokenBucket implements MeasurableStat {

Review comment:
       Sure.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #9114: KAFKA-10162; Use Token Bucket algorithm for controller mutation quota (KIP-599, Part III)

Posted by GitBox <gi...@apache.org>.
dajac commented on pull request #9114:
URL: https://github.com/apache/kafka/pull/9114#issuecomment-669512698


   Sure. I will update the KIP and the mailing list tomorrow. Thanks, Jun!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org