You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2022/03/21 17:35:47 UTC
[flink] branch release-1.14 updated: [FLINK-25904][metrics] Lazily initialize Percentile
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.14 by this push:
new f44982b [FLINK-25904][metrics] Lazily initialize Percentile
f44982b is described below
commit f44982b719457af56a03f7ae39e9bbf271fa2de4
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Fri Mar 18 15:06:19 2022 +0100
[FLINK-25904][metrics] Lazily initialize Percentile
Percentile serialization doesn't work properly (see MATH-1642), so instead of serialize the data array and lazily initialize the Percentile as needed.
---
.../DescriptiveStatisticsHistogramStatistics.java | 29 ++++++++----
.../DescriptiveStatisticsHistogramTest.java | 55 ++++++++++++++++++++++
2 files changed, 75 insertions(+), 9 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/DescriptiveStatisticsHistogramStatistics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/DescriptiveStatisticsHistogramStatistics.java
index 12975b6..8eb5036 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/DescriptiveStatisticsHistogramStatistics.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/DescriptiveStatisticsHistogramStatistics.java
@@ -17,6 +17,7 @@
package org.apache.flink.runtime.metrics;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.metrics.HistogramStatistics;
import org.apache.commons.math3.exception.MathIllegalArgumentException;
@@ -90,15 +91,16 @@ public class DescriptiveStatisticsHistogramStatistics extends HistogramStatistic
* will not return a value but instead populate this class so that further values can be
* retrieved from it.
*/
- private static class CommonMetricsSnapshot implements UnivariateStatistic, Serializable {
- private static final long serialVersionUID = 1L;
+ @VisibleForTesting
+ static class CommonMetricsSnapshot implements UnivariateStatistic, Serializable {
+ private static final long serialVersionUID = 2L;
- private long count = 0;
+ private double[] data;
private double min = Double.NaN;
private double max = Double.NaN;
private double mean = Double.NaN;
private double stddev = Double.NaN;
- private Percentile percentilesImpl = new Percentile().withNaNStrategy(NaNStrategy.FIXED);
+ private transient Percentile percentilesImpl;
@Override
public double evaluate(final double[] values) throws MathIllegalArgumentException {
@@ -108,8 +110,7 @@ public class DescriptiveStatisticsHistogramStatistics extends HistogramStatistic
@Override
public double evaluate(double[] values, int begin, int length)
throws MathIllegalArgumentException {
- this.count = length;
- percentilesImpl.setData(values, begin, length);
+ this.data = values;
SimpleStats secondMoment = new SimpleStats();
secondMoment.evaluate(values, begin, length);
@@ -125,17 +126,16 @@ public class DescriptiveStatisticsHistogramStatistics extends HistogramStatistic
@Override
public CommonMetricsSnapshot copy() {
CommonMetricsSnapshot result = new CommonMetricsSnapshot();
- result.count = count;
+ result.data = Arrays.copyOf(data, data.length);
result.min = min;
result.max = max;
result.mean = mean;
result.stddev = stddev;
- result.percentilesImpl = percentilesImpl.copy();
return result;
}
long getCount() {
- return count;
+ return data.length;
}
double getMin() {
@@ -155,12 +155,23 @@ public class DescriptiveStatisticsHistogramStatistics extends HistogramStatistic
}
double getPercentile(double p) {
+ maybeInitPercentile();
return percentilesImpl.evaluate(p);
}
double[] getValues() {
+ maybeInitPercentile();
return percentilesImpl.getData();
}
+
+ private void maybeInitPercentile() {
+ if (percentilesImpl == null) {
+ percentilesImpl = new Percentile().withNaNStrategy(NaNStrategy.FIXED);
+ }
+ if (data != null) {
+ percentilesImpl.setData(data);
+ }
+ }
}
/**
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/DescriptiveStatisticsHistogramTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/DescriptiveStatisticsHistogramTest.java
index be1372f..30ca775 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/DescriptiveStatisticsHistogramTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/DescriptiveStatisticsHistogramTest.java
@@ -19,19 +19,74 @@
package org.apache.flink.runtime.metrics;
import org.apache.flink.metrics.AbstractHistogramTest;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.function.FunctionWithException;
import org.junit.Test;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.Matchers.closeTo;
+import static org.junit.Assert.assertThat;
+
/**
* Tests for {@link DescriptiveStatisticsHistogram} and {@link
* DescriptiveStatisticsHistogramStatistics}.
*/
public class DescriptiveStatisticsHistogramTest extends AbstractHistogramTest {
+ private static final double[] DATA = {1, 2, 3, 4, 5, 6, 7, 8, 9};
+
/** Tests the histogram functionality of the DropwizardHistogramWrapper. */
@Test
public void testDescriptiveHistogram() {
int size = 10;
testHistogram(size, new DescriptiveStatisticsHistogram(size));
}
+
+ /** Tests our workaround for https://issues.apache.org/jira/browse/MATH-1642. */
+ @Test
+ public void testSerialization() throws Exception {
+ testDuplication(
+ original -> {
+ final byte[] bytes = InstantiationUtil.serializeObject(original);
+ return (DescriptiveStatisticsHistogramStatistics.CommonMetricsSnapshot)
+ InstantiationUtil.deserializeObject(bytes, getClass().getClassLoader());
+ });
+ }
+
+ @Test
+ public void testCopy() throws Exception {
+ testDuplication(DescriptiveStatisticsHistogramStatistics.CommonMetricsSnapshot::copy);
+ }
+
+ private static void testDuplication(
+ FunctionWithException<
+ DescriptiveStatisticsHistogramStatistics.CommonMetricsSnapshot,
+ DescriptiveStatisticsHistogramStatistics.CommonMetricsSnapshot,
+ Exception>
+ duplicator)
+ throws Exception {
+
+ DescriptiveStatisticsHistogramStatistics.CommonMetricsSnapshot original =
+ new DescriptiveStatisticsHistogramStatistics.CommonMetricsSnapshot();
+ original.evaluate(DATA);
+
+ assertOperations(original);
+
+ final DescriptiveStatisticsHistogramStatistics.CommonMetricsSnapshot copy =
+ duplicator.apply(original);
+
+ assertOperations(copy);
+ }
+
+ private static void assertOperations(
+ DescriptiveStatisticsHistogramStatistics.CommonMetricsSnapshot statistics) {
+ assertThat(statistics.getPercentile(0.5), equalTo(1.0));
+ assertThat(statistics.getCount(), equalTo(9L));
+ assertThat(statistics.getMin(), equalTo(1.0));
+ assertThat(statistics.getMax(), equalTo(9.0));
+ assertThat(statistics.getMean(), equalTo(5.0));
+ assertThat(statistics.getStandardDeviation(), closeTo(2.7, 0.5));
+ assertThat(statistics.getValues(), equalTo(DATA));
+ }
}