You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2022/08/10 13:25:14 UTC

[kafka] branch trunk updated: Fix the rate window size calculation for edge cases (#12184)

This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f17928e4be3 Fix the rate window size calculation for edge cases (#12184)
f17928e4be3 is described below

commit f17928e4be35d1d22b50aa575697b4c2601a2ea0
Author: Divij Vaidya <di...@amazon.com>
AuthorDate: Wed Aug 10 15:25:05 2022 +0200

    Fix the rate window size calculation for edge cases (#12184)
    
    ## Problem
    Implementation of connection creation rate quotas in Kafka is dependent on two configurations:
    [quota.window.num](https://kafka.apache.org/documentation.html#brokerconfigs_quota.window.num) AND [quota.window.size.seconds](https://kafka.apache.org/documentation.html#brokerconfigs_quota.window.size.seconds)
    
    The minimum possible values of these configuration is 1 as per the documentation. However, when we set 1 as the configuration value, we can hit a situation where rate is calculated as NaN (and hence, leads to exceptions). This specific scenario occurs when an event is recorded at the start of a sample window.
    
    ## Solution
    This patch fixes this edge case by ensuring that the windowSize over which Rate is calculated is at least 1ms (even if it is calculated at the start of the sample window).
    
    ## Test
    Added a unit test which fails before the patch and passes after the patch
    
    Reviewers: Ismael Juma <is...@juma.me.uk>, David Mao <dm...@confluent.io>
---
 .../apache/kafka/common/metrics/stats/Rate.java    |  5 +-
 .../apache/kafka/common/metrics/MetricsTest.java   |  7 ++-
 .../kafka/common/metrics/stats/RateTest.java       | 67 ++++++++++++++++++++++
 3 files changed, 75 insertions(+), 4 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java
index c6b8574186a..09b7c05c8f2 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java
@@ -91,7 +91,10 @@ public class Rate implements MeasurableStat {
         if (numFullWindows < minFullWindows)
             totalElapsedTimeMs += (minFullWindows - numFullWindows) * config.timeWindowMs();
 
-        return totalElapsedTimeMs;
+        // If window size is being calculated at the exact beginning of the window with no prior samples, the window size
+        // will result in a value of 0. Calculation of rate over a window is size 0 is undefined, hence, we assume the
+        // minimum window size to be at least 1ms.
+        return Math.max(totalElapsedTimeMs, 1);
     }
 
     @Override
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
index 3dd114d9fd4..bc1fc5d9e56 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
@@ -45,6 +45,7 @@ import java.util.function.Function;
 
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.internals.MetricsUtils;
 import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.metrics.stats.CumulativeSum;
 import org.apache.kafka.common.metrics.stats.Max;
@@ -607,15 +608,15 @@ public class MetricsTest {
         // Sleep for half the window.
         time.sleep(cfg.timeWindowMs() / 2);
 
-        // prior to any time passing
-        double elapsedSecs = (cfg.timeWindowMs() * (cfg.samples() - 1) + cfg.timeWindowMs() / 2) / 1000.0;
+        // prior to any time passing, elapsedSecs = sampleWindowSize * (total samples - half of final sample)
+        double elapsedSecs = MetricsUtils.convert(cfg.timeWindowMs(), TimeUnit.SECONDS) * (cfg.samples() - 0.5);
 
         KafkaMetric rateMetric = metrics.metrics().get(rateMetricName);
         KafkaMetric countRateMetric = metrics.metrics().get(countRateMetricName);
         assertEquals(sum / elapsedSecs, (Double) rateMetric.metricValue(), EPS, "Rate(0...2) = 2.666");
         assertEquals(count / elapsedSecs, (Double) countRateMetric.metricValue(), EPS, "Count rate(0...2) = 0.02666");
         assertEquals(elapsedSecs,
-                ((Rate) rateMetric.measurable()).windowSize(cfg, time.milliseconds()) / 1000, EPS, "Elapsed Time = 75 seconds");
+                MetricsUtils.convert(((Rate) rateMetric.measurable()).windowSize(cfg, time.milliseconds()), TimeUnit.SECONDS), EPS, "Elapsed Time = 75 seconds");
         assertEquals(sum, (Double) totalMetric.metricValue(), EPS);
         assertEquals(count, (Double) countTotalMetric.metricValue(), EPS);
 
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/stats/RateTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/stats/RateTest.java
new file mode 100644
index 00000000000..04c5ca1292f
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/stats/RateTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics.stats;
+
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.internals.MetricsUtils;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+public class RateTest {
+    private static final double EPS = 0.000001;
+    private Rate r;
+    private Time timeClock;
+
+    @BeforeEach
+    public void setup() {
+        r = new Rate();
+        timeClock = new MockTime();
+    }
+
+    // Tests the scenario where the recording and measurement is done before the window for first sample finishes
+    // with no prior samples retained.
+    @ParameterizedTest
+    @CsvSource({"1,1", "1,11", "11,1", "11,11"})
+    public void testRateWithNoPriorAvailableSamples(int numSample, int sampleWindowSizeSec) {
+        final MetricConfig config = new MetricConfig().samples(numSample).timeWindow(sampleWindowSizeSec, TimeUnit.SECONDS);
+        final double sampleValue = 50.0;
+        // record at beginning of the window
+        r.record(config, sampleValue, timeClock.milliseconds());
+        // forward time till almost the end of window
+        final long measurementTime = TimeUnit.SECONDS.toMillis(sampleWindowSizeSec) - 1;
+        timeClock.sleep(measurementTime);
+        // calculate rate at almost the end of window
+        final double observedRate = r.measure(config, timeClock.milliseconds());
+        assertFalse(Double.isNaN(observedRate));
+
+        // In a scenario where sufficient number of samples is not available yet, the rate calculation algorithm assumes
+        // presence of N-1 (where N = numSample) prior samples with sample values of 0. Hence, the window size for rate
+        // calculation accounts for N-1 prior samples
+        final int dummyPriorSamplesAssumedByAlgorithm = numSample - 1;
+        final double windowSize = MetricsUtils.convert(measurementTime, TimeUnit.SECONDS) + (dummyPriorSamplesAssumedByAlgorithm * sampleWindowSizeSec);
+        double expectedRatePerSec = sampleValue / windowSize;
+        assertEquals(expectedRatePerSec, observedRate, EPS);
+    }
+}