You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/11/23 16:35:59 UTC

[GitHub] [flink] mxm commented on a diff in pull request #21374: [FLINK-30090] Support timespan for TimerGauges

mxm commented on code in PR #21374:
URL: https://github.com/apache/flink/pull/21374#discussion_r1030658450


##########
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/TimerGauge.java:
##########
@@ -50,11 +62,24 @@ public class TimerGauge implements Gauge<Long>, View {
     private long accumulatedCount;
 
     public TimerGauge() {
-        this(SystemClock.getInstance());
+        this(DEFAULT_TIME_SPAN_IN_SECONDS);
+    }
+
+    public TimerGauge(int timeSpanInSeconds) {
+        this(SystemClock.getInstance(), timeSpanInSeconds);
     }
 
     public TimerGauge(Clock clock) {
+        this(clock, DEFAULT_TIME_SPAN_IN_SECONDS);
+    }
+
+    public TimerGauge(Clock clock, int timeSpanInSeconds) {
         this.clock = clock;
+        this.timeSpanInSeconds =
+                Math.max(
+                        timeSpanInSeconds - (timeSpanInSeconds % UPDATE_INTERVAL_SECONDS),
+                        UPDATE_INTERVAL_SECONDS);

Review Comment:
   It would be kind of nice not having to adjust this but we are constrained by the UPDATE_INTERVAL_SECONDS which we can't easily change. So looks good.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/TimerGauge.java:
##########
@@ -32,9 +32,21 @@
  * happen in a couple of hours, the returned value will account for this ongoing measurement.
  */
 public class TimerGauge implements Gauge<Long>, View {
+
+    private static final int DEFAULT_TIME_SPAN_IN_SECONDS = 60;
+
     private final Clock clock;
 
-    private long previousCount;
+    /** The time-span over which the average is calculated. */
+    private final int timeSpanInSeconds;
+    /** Circular array containing the history of values. */
+    private final long[] values;
+    /** The index in the array for the current time. */
+    private int time = 0;

Review Comment:
   I think `idx` would be a better name for this. Time is confusing because this actually gets reset to zero when the array is full.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/TimerGauge.java:
##########
@@ -89,15 +114,32 @@ public synchronized void update() {
             currentMaxSingleMeasurement =
                     Math.max(currentMaxSingleMeasurement, now - currentMeasurementStartTS);
         }
-        previousCount = Math.max(Math.min(currentCount / UPDATE_INTERVAL_SECONDS, 1000), 0);
+        updateCurrentValue();
         previousMaxSingleMeasurement = currentMaxSingleMeasurement;
         currentCount = 0;
         currentMaxSingleMeasurement = 0;
     }
 
+    private void updateCurrentValue() {
+        if (time == values.length - 1) {
+            fullWindow = true;
+        }
+        values[time] = currentCount;
+        time = (time + 1) % values.length;
+
+        int maxIndex = fullWindow ? values.length : time;
+        long totalTime = 0;
+        for (int i = 0; i < maxIndex; i++) {
+            totalTime += values[i];
+        }

Review Comment:
   I appreciate the attention to detail here on properly calculating the totalTime based on the number of available observations :) Also, I very much prefer it over only releasing the Gauge once we have all observations. That way, this feature shouldn't cause any regressions because the Gauge as quickly as it does now but its accuracy will improve after a minute.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org