You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2016/10/19 14:29:29 UTC
[3/3] flink git commit: [FLINK-4586] [core] Broken AverageAccumulator
[FLINK-4586] [core] Broken AverageAccumulator
This closes #2639
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/428419d5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/428419d5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/428419d5
Branch: refs/heads/master
Commit: 428419d599d138f1647f84807d6d0224652f3d1b
Parents: cb78d70
Author: Greg Hogan <co...@greghogan.com>
Authored: Fri Oct 14 16:18:52 2016 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Wed Oct 19 09:46:49 2016 -0400
----------------------------------------------------------------------
.../common/accumulators/AverageAccumulator.java | 27 ++++++++++----------
.../accumulators/AverageAccumulatorTest.java | 18 ++++++++-----
2 files changed, 26 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/428419d5/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AverageAccumulator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AverageAccumulator.java b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AverageAccumulator.java
index 9c0f62f..67cf572 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AverageAccumulator.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AverageAccumulator.java
@@ -28,29 +28,30 @@ import org.apache.flink.annotation.Public;
public class AverageAccumulator implements SimpleAccumulator<Double> {
private static final long serialVersionUID = 3672555084179165255L;
-
- private double localValue;
+
private long count;
+ private double sum;
+
@Override
public void add(Double value) {
this.count++;
- this.localValue += value;
+ this.sum += value;
}
public void add(double value) {
this.count++;
- this.localValue += value;
+ this.sum += value;
}
public void add(long value) {
this.count++;
- this.localValue += value;
+ this.sum += value;
}
public void add(int value) {
this.count++;
- this.localValue += value;
+ this.sum += value;
}
@Override
@@ -58,21 +59,21 @@ public class AverageAccumulator implements SimpleAccumulator<Double> {
if (this.count == 0) {
return 0.0;
}
- return this.localValue / (double)this.count;
+ return this.sum / this.count;
}
@Override
public void resetLocal() {
this.count = 0;
- this.localValue = 0;
+ this.sum = 0;
}
@Override
public void merge(Accumulator<Double, Double> other) {
if (other instanceof AverageAccumulator) {
- AverageAccumulator temp = (AverageAccumulator)other;
- this.count += temp.count;
- this.localValue += other.getLocalValue();
+ AverageAccumulator avg = (AverageAccumulator)other;
+ this.count += avg.count;
+ this.sum += avg.sum;
} else {
throw new IllegalArgumentException("The merged accumulator must be AverageAccumulator.");
}
@@ -81,13 +82,13 @@ public class AverageAccumulator implements SimpleAccumulator<Double> {
@Override
public AverageAccumulator clone() {
AverageAccumulator average = new AverageAccumulator();
- average.localValue = this.localValue;
average.count = this.count;
+ average.sum = this.sum;
return average;
}
@Override
public String toString() {
- return "AverageAccumulator " + this.localValue + " count " + this.count;
+ return "AverageAccumulator " + this.getLocalValue() + " for " + this.count + " elements";
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/428419d5/flink-core/src/test/java/org/apache/flink/api/common/accumulators/AverageAccumulatorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/accumulators/AverageAccumulatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/accumulators/AverageAccumulatorTest.java
index 9ebd27c..585511f 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/accumulators/AverageAccumulatorTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/accumulators/AverageAccumulatorTest.java
@@ -83,12 +83,18 @@ public class AverageAccumulatorTest {
@Test
public void testMergeSuccess() {
- AverageAccumulator average = new AverageAccumulator();
- AverageAccumulator averageNew = new AverageAccumulator();
- average.add(1);
- averageNew.add(2);
- average.merge(averageNew);
- assertEquals(1.5, average.getLocalValue(), 0.0);
+ AverageAccumulator avg1 = new AverageAccumulator();
+ for (int i = 0; i < 5; i++) {
+ avg1.add(i);
+ }
+
+ AverageAccumulator avg2 = new AverageAccumulator();
+ for (int i = 5; i < 10; i++) {
+ avg2.add(i);
+ }
+
+ avg1.merge(avg2);
+ assertEquals(4.5, avg1.getLocalValue(), 0.0);
}
@Test