You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/06/13 11:04:01 UTC

[GitHub] [kafka] tombentley commented on a diff in pull request #12045: KAFKA-12319: Change calculation of window size used to calculate `Rate`

tombentley commented on code in PR #12045:
URL: https://github.com/apache/kafka/pull/12045#discussion_r895546403


##########
clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java:
##########
@@ -34,35 +35,56 @@
  */
 public abstract class SampledStat implements MeasurableStat {
 
-    private double initialValue;
+    private final double initialValue;
+    /**
+     * Index of the latest stored sample.
+     */
     private int current = 0;
+    /**
+     * Stores the recorded samples in a ring buffer.

Review Comment:
   I found this a bit confusing. To me "ring buffer" means a fixed size list with mutable head and tail indices. But `samples` isn't like that. Rather it's a list of mutable samples, and `Sample.reset` is used to overwrite old samples. The effect is the same, of course, but I wonder if we could review the comment:
   
   ```suggestion
        * Stores the recorded samples. Older samples are overwritten using {@link Sample#reset(long)}.
   ```



##########
clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java:
##########
@@ -608,14 +609,14 @@ public void testRateWindowing() throws Exception {
         time.sleep(cfg.timeWindowMs() / 2);
 
         // prior to any time passing
-        double elapsedSecs = (cfg.timeWindowMs() * (cfg.samples() - 1) + cfg.timeWindowMs() / 2) / 1000.0;
+        double elapsedSecs = (cfg.timeWindowMs() * (cfg.samples() - 1) + (((double) cfg.timeWindowMs()) / 2.0d)) / 1000.0d;

Review Comment:
   If we divide by the literal `2.0d` then the type cast of cfg.timeWindowMs() isn't needed. 



##########
clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java:
##########
@@ -68,24 +68,55 @@ public double measure(MetricConfig config, long now) {
     }
 
     public long windowSize(MetricConfig config, long now) {
-        // purge old samples before we compute the window size
+        // Purge obsolete samples. Obsolete samples are the ones which are not relevant to the current calculation
+        // because their creation time is outside (before) the duration of time window used to calculate rate.
         stat.purgeObsoleteSamples(config, now);
 
         /*
          * Here we check the total amount of time elapsed since the oldest non-obsolete window.
-         * This give the total windowSize of the batch which is the time used for Rate computation.
-         * However, there is an issue if we do not have sufficient data for e.g. if only 1 second has elapsed in a 30 second
-         * window, the measured rate will be very high.
-         * Hence we assume that the elapsed time is always N-1 complete windows plus whatever fraction of the final window is complete.
+         * This gives the duration of computation time window which used to calculate Rate.

Review Comment:
   ```suggestion
            * This gives the duration of computation time window which is used to calculate Rate.
   ```



##########
clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java:
##########
@@ -84,13 +106,7 @@ public Sample current(long timeMs) {
     public Sample oldest(long now) {
         if (samples.size() == 0)
             this.samples.add(newSample(now));
-        Sample oldest = this.samples.get(0);
-        for (int i = 1; i < this.samples.size(); i++) {
-            Sample curr = this.samples.get(i);
-            if (curr.lastWindowMs < oldest.lastWindowMs)
-                oldest = curr;
-        }
-        return oldest;
+        return samples.stream().min(Comparator.comparingLong(s -> s.lastWindowMs)).orElse(samples.get(0));

Review Comment:
   I wonder whether this is worth doing. I know it's shorter, but I think it will be slower, at least until the JIT optimizes it.



##########
clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java:
##########
@@ -34,35 +35,56 @@
  */
 public abstract class SampledStat implements MeasurableStat {
 
-    private double initialValue;
+    private final double initialValue;
+    /**
+     * Index of the latest stored sample.
+     */
     private int current = 0;
+    /**
+     * Stores the recorded samples in a ring buffer.
+     */
     protected List<Sample> samples;
 
     public SampledStat(double initialValue) {
         this.initialValue = initialValue;
         this.samples = new ArrayList<>(2);
     }
 
+    /**
+     * {@inheritDoc}
+     *
+     * On every record, do the following:
+     * 1. Check if the current window has expired
+     * 2. If yes, then advance the current pointer to new window. The start time of the new window is set to nearest
+     *    possible starting point for the new window. The nearest starting point occurs at config.timeWindowMs intervals
+     *    from the end time of last known window.
+     * 3. Update the recorded value for the current window
+     * 4. Increase the number of event count
+     */
     @Override
-    public void record(MetricConfig config, double value, long timeMs) {
-        Sample sample = current(timeMs);
-        if (sample.isComplete(timeMs, config))
-            sample = advance(config, timeMs);
-        update(sample, config, value, timeMs);
-        sample.eventCount += 1;
+    public void record(MetricConfig config, double value, long recordingTimeMs) {
+        Sample sample = current(recordingTimeMs);
+        if (sample.isComplete(recordingTimeMs, config)) {
+            final long previousWindowStartTime = sample.lastWindowMs;
+            final long previousWindowEndtime = previousWindowStartTime + config.timeWindowMs();
+            final long startTimeOfNewWindow = recordingTimeMs - ((recordingTimeMs - previousWindowEndtime) % config.timeWindowMs());

Review Comment:
   `recordingTimeMs` seems to usually come from `Time.milliseconds`, thus from `System.currentTimeMillis`:
   
   * It's therefore not guaranteed to be monotonic.
   * Which could be exacerbated by the `synchronized` blocks in `Sensor.recordInternal`, because synchronized provides no guarantee about fairness for blocked threads.
   
   `sample.isComplete(recordingTimeMs, config)` could return true based on the number of samples, not the time.
   
   So I _think_ it's possible that `recordingTimeMs < previousWindowEndtime`, so that `startTimeOfNewWindow` ends up ahead of `recordingTimeMs`. Which I don't think is intended. Or if it is it's definitely something that's worthy of a comment.



##########
clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java:
##########
@@ -149,13 +149,14 @@ private void verifyStats(Function<KafkaMetric, Double> metricValueFunc) {
 
         assertEquals(5.0, metricValueFunc.apply(metrics.metric(metrics.metricName("s2.total", "grp1"))), EPS,
             "s2 reflects the constant value");
-        assertEquals(4.5, metricValueFunc.apply(metrics.metric(metrics.metricName("test.avg", "grp1"))), EPS,
+        assertEquals(sum / (double) count, metricValueFunc.apply(metrics.metric(metrics.metricName("test.avg", "grp1"))), EPS,
             "Avg(0...9) = 4.5");
         assertEquals(count - 1,  metricValueFunc.apply(metrics.metric(metrics.metricName("test.max", "grp1"))), EPS,
             "Max(0...9) = 9");
         assertEquals(0.0, metricValueFunc.apply(metrics.metric(metrics.metricName("test.min", "grp1"))), EPS,
             "Min(0...9) = 0");
-        assertEquals(sum / elapsedSecs, metricValueFunc.apply(metrics.metric(metrics.metricName("test.rate", "grp1"))), EPS,
+        // rate is calculated over the first ever window. Hence, we assume presence of prior windows with 0 recorded events.
+        assertEquals((double) sum / elapsedSecs, metricValueFunc.apply(metrics.metric(metrics.metricName("test.rate", "grp1"))), EPS,

Review Comment:
   `elapsedSecs` is a `double`, so the type cast is not needed (per https://docs.oracle.com/javase/specs/jls/se7/html/jls-5.html#jls-5.6.2).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org