You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2022/03/21 17:35:47 UTC

[flink] branch release-1.14 updated: [FLINK-25904][metrics] Lazily initialize Percentile

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.14 by this push:
     new f44982b  [FLINK-25904][metrics] Lazily initialize Percentile
f44982b is described below

commit f44982b719457af56a03f7ae39e9bbf271fa2de4
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Fri Mar 18 15:06:19 2022 +0100

    [FLINK-25904][metrics] Lazily initialize Percentile
    
    Percentile serialization doesn't work properly (see MATH-1642), so instead of serialize the data array and lazily initialize the Percentile as needed.
---
 .../DescriptiveStatisticsHistogramStatistics.java  | 29 ++++++++----
 .../DescriptiveStatisticsHistogramTest.java        | 55 ++++++++++++++++++++++
 2 files changed, 75 insertions(+), 9 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/DescriptiveStatisticsHistogramStatistics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/DescriptiveStatisticsHistogramStatistics.java
index 12975b6..8eb5036 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/DescriptiveStatisticsHistogramStatistics.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/DescriptiveStatisticsHistogramStatistics.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.runtime.metrics;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.metrics.HistogramStatistics;
 
 import org.apache.commons.math3.exception.MathIllegalArgumentException;
@@ -90,15 +91,16 @@ public class DescriptiveStatisticsHistogramStatistics extends HistogramStatistic
      * will not return a value but instead populate this class so that further values can be
      * retrieved from it.
      */
-    private static class CommonMetricsSnapshot implements UnivariateStatistic, Serializable {
-        private static final long serialVersionUID = 1L;
+    @VisibleForTesting
+    static class CommonMetricsSnapshot implements UnivariateStatistic, Serializable {
+        private static final long serialVersionUID = 2L;
 
-        private long count = 0;
+        private double[] data;
         private double min = Double.NaN;
         private double max = Double.NaN;
         private double mean = Double.NaN;
         private double stddev = Double.NaN;
-        private Percentile percentilesImpl = new Percentile().withNaNStrategy(NaNStrategy.FIXED);
+        private transient Percentile percentilesImpl;
 
         @Override
         public double evaluate(final double[] values) throws MathIllegalArgumentException {
@@ -108,8 +110,7 @@ public class DescriptiveStatisticsHistogramStatistics extends HistogramStatistic
         @Override
         public double evaluate(double[] values, int begin, int length)
                 throws MathIllegalArgumentException {
-            this.count = length;
-            percentilesImpl.setData(values, begin, length);
+            this.data = values;
 
             SimpleStats secondMoment = new SimpleStats();
             secondMoment.evaluate(values, begin, length);
@@ -125,17 +126,16 @@ public class DescriptiveStatisticsHistogramStatistics extends HistogramStatistic
         @Override
         public CommonMetricsSnapshot copy() {
             CommonMetricsSnapshot result = new CommonMetricsSnapshot();
-            result.count = count;
+            result.data = Arrays.copyOf(data, data.length);
             result.min = min;
             result.max = max;
             result.mean = mean;
             result.stddev = stddev;
-            result.percentilesImpl = percentilesImpl.copy();
             return result;
         }
 
         long getCount() {
-            return count;
+            return data.length;
         }
 
         double getMin() {
@@ -155,12 +155,23 @@ public class DescriptiveStatisticsHistogramStatistics extends HistogramStatistic
         }
 
         double getPercentile(double p) {
+            maybeInitPercentile();
             return percentilesImpl.evaluate(p);
         }
 
         double[] getValues() {
+            maybeInitPercentile();
             return percentilesImpl.getData();
         }
+
+        private void maybeInitPercentile() {
+            if (percentilesImpl == null) {
+                percentilesImpl = new Percentile().withNaNStrategy(NaNStrategy.FIXED);
+            }
+            if (data != null) {
+                percentilesImpl.setData(data);
+            }
+        }
     }
 
     /**
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/DescriptiveStatisticsHistogramTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/DescriptiveStatisticsHistogramTest.java
index be1372f..30ca775 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/DescriptiveStatisticsHistogramTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/DescriptiveStatisticsHistogramTest.java
@@ -19,19 +19,74 @@
 package org.apache.flink.runtime.metrics;
 
 import org.apache.flink.metrics.AbstractHistogramTest;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.function.FunctionWithException;
 
 import org.junit.Test;
 
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.Matchers.closeTo;
+import static org.junit.Assert.assertThat;
+
 /**
  * Tests for {@link DescriptiveStatisticsHistogram} and {@link
  * DescriptiveStatisticsHistogramStatistics}.
  */
 public class DescriptiveStatisticsHistogramTest extends AbstractHistogramTest {
 
+    private static final double[] DATA = {1, 2, 3, 4, 5, 6, 7, 8, 9};
+
     /** Tests the histogram functionality of the DropwizardHistogramWrapper. */
     @Test
     public void testDescriptiveHistogram() {
         int size = 10;
         testHistogram(size, new DescriptiveStatisticsHistogram(size));
     }
+
+    /** Tests our workaround for https://issues.apache.org/jira/browse/MATH-1642. */
+    @Test
+    public void testSerialization() throws Exception {
+        testDuplication(
+                original -> {
+                    final byte[] bytes = InstantiationUtil.serializeObject(original);
+                    return (DescriptiveStatisticsHistogramStatistics.CommonMetricsSnapshot)
+                            InstantiationUtil.deserializeObject(bytes, getClass().getClassLoader());
+                });
+    }
+
+    @Test
+    public void testCopy() throws Exception {
+        testDuplication(DescriptiveStatisticsHistogramStatistics.CommonMetricsSnapshot::copy);
+    }
+
+    private static void testDuplication(
+            FunctionWithException<
+                            DescriptiveStatisticsHistogramStatistics.CommonMetricsSnapshot,
+                            DescriptiveStatisticsHistogramStatistics.CommonMetricsSnapshot,
+                            Exception>
+                    duplicator)
+            throws Exception {
+
+        DescriptiveStatisticsHistogramStatistics.CommonMetricsSnapshot original =
+                new DescriptiveStatisticsHistogramStatistics.CommonMetricsSnapshot();
+        original.evaluate(DATA);
+
+        assertOperations(original);
+
+        final DescriptiveStatisticsHistogramStatistics.CommonMetricsSnapshot copy =
+                duplicator.apply(original);
+
+        assertOperations(copy);
+    }
+
+    private static void assertOperations(
+            DescriptiveStatisticsHistogramStatistics.CommonMetricsSnapshot statistics) {
+        assertThat(statistics.getPercentile(0.5), equalTo(1.0));
+        assertThat(statistics.getCount(), equalTo(9L));
+        assertThat(statistics.getMin(), equalTo(1.0));
+        assertThat(statistics.getMax(), equalTo(9.0));
+        assertThat(statistics.getMean(), equalTo(5.0));
+        assertThat(statistics.getStandardDeviation(), closeTo(2.7, 0.5));
+        assertThat(statistics.getValues(), equalTo(DATA));
+    }
 }