You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/07/03 15:25:03 UTC

[GitHub] [kafka] dajac commented on a change in pull request #8977: KAFKA-10162; Quota Enhancements (KIP-599)

dajac commented on a change in pull request #8977:
URL: https://github.com/apache/kafka/pull/8977#discussion_r449633334



##########
File path: clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java
##########
@@ -209,4 +212,139 @@ public void shouldReturnPresenceOfMetrics() {
 
         assertThat(sensor.hasMetrics(), is(true));
     }
-}
\ No newline at end of file
+
+    @Test
+    public void testStrictQuotaEnforcement() {
+        final Time time = new MockTime(0, 0, 0);
+        final Metrics metrics = new Metrics(time);
+        final Sensor sensor = metrics.sensor("sensor", new MetricConfig()
+            .quota(Quota.upperBound(10))
+            .timeWindow(1, TimeUnit.SECONDS)
+            .samples(11));
+        final MetricName metricName = metrics.metricName("rate", "test-group");
+        assertTrue(sensor.add(metricName, new Rate()));
+        final KafkaMetric rateMetric = metrics.metric(metricName);
+
+        // Recording a first value at T+0s to bring the avg rate to 9. Value is accepted
+        // because the quota is not exhausted yet.
+        sensor.record(90, time.milliseconds(), QuotaEnforcementType.STRICT);
+        assertEquals(9, rateMetric.measurableValue(time.milliseconds()), 0.1);
+
+        // Recording a second value at T+1s to bring the avg rate to 18. Value is accepted
+        // because the quota is not exhausted yet.
+        time.sleep(1000);
+        sensor.record(90, time.milliseconds(), QuotaEnforcementType.STRICT);
+        assertEquals(18, rateMetric.measurableValue(time.milliseconds()), 0.1);
+
+        // Recording a third value at T+2s is rejected immediately and rate is not updated
+        // because the quota is exhausted.
+        time.sleep(1000);
+        assertThrows(QuotaViolationException.class,
+            () -> sensor.record(90, time.milliseconds(), QuotaEnforcementType.STRICT));
+        assertEquals(18, rateMetric.measurableValue(time.milliseconds()), 0.1);
+
+        metrics.close();
+    }
+
+    @Test
+    public void testPermissiveQuotaEnforcement() {
+        final Time time = new MockTime(0, 0, 0);
+        final Metrics metrics = new Metrics(time);
+        final Sensor sensor = metrics.sensor("sensor", new MetricConfig()
+            .quota(Quota.upperBound(10))
+            .timeWindow(1, TimeUnit.SECONDS)
+            .samples(11));
+        final MetricName metricName = metrics.metricName("rate", "test-group");
+        assertTrue(sensor.add(metricName, new Rate()));
+        final KafkaMetric rateMetric = metrics.metric(metricName);
+
+        // Recording a first value at T+0s to bring the avg rate to 9. Value is accepted
+        // because the quota is not exhausted yet.
+        sensor.record(90, time.milliseconds(), QuotaEnforcementType.PERMISSIVE);
+        assertEquals(9, rateMetric.measurableValue(time.milliseconds()), 0.1);
+
+        // Recording a second value at T+1s to bring the avg rate to 18. Value is accepted
+        // and rate is updated even though the quota is exhausted.
+        time.sleep(1000);
+        assertThrows(QuotaViolationException.class,
+            () -> sensor.record(90, time.milliseconds(), QuotaEnforcementType.PERMISSIVE));
+        assertEquals(18, rateMetric.measurableValue(time.milliseconds()), 0.1);
+
+        // Recording a second value at T+1s to bring the avg rate to 27. Value is accepted
+        // and rate is updated even though the quota is exhausted.
+        time.sleep(1000);
+        assertThrows(QuotaViolationException.class,
+            () -> sensor.record(90, time.milliseconds(), QuotaEnforcementType.PERMISSIVE));
+        assertEquals(27, rateMetric.measurableValue(time.milliseconds()), 0.1);
+
+        metrics.close();
+    }
+
+    @Test
+    public void testStrictQuotaEnforcementWithDefaultRate() {

Review comment:
       This test illustrate the problem that we are trying to resolve.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org