You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2016/10/19 14:29:29 UTC

[3/3] flink git commit: [FLINK-4586] [core] Broken AverageAccumulator

[FLINK-4586] [core] Broken AverageAccumulator

This closes #2639


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/428419d5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/428419d5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/428419d5

Branch: refs/heads/master
Commit: 428419d599d138f1647f84807d6d0224652f3d1b
Parents: cb78d70
Author: Greg Hogan <co...@greghogan.com>
Authored: Fri Oct 14 16:18:52 2016 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Wed Oct 19 09:46:49 2016 -0400

----------------------------------------------------------------------
 .../common/accumulators/AverageAccumulator.java | 27 ++++++++++----------
 .../accumulators/AverageAccumulatorTest.java    | 18 ++++++++-----
 2 files changed, 26 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/428419d5/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AverageAccumulator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AverageAccumulator.java b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AverageAccumulator.java
index 9c0f62f..67cf572 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AverageAccumulator.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AverageAccumulator.java
@@ -28,29 +28,30 @@ import org.apache.flink.annotation.Public;
 public class AverageAccumulator implements SimpleAccumulator<Double> {
 
 	private static final long serialVersionUID = 3672555084179165255L;
-	
-	private double localValue;
+
 	private long count;
 
+	private double sum;
+
 	@Override
 	public void add(Double value) {
 		this.count++;
-		this.localValue += value;
+		this.sum += value;
 	}
 
 	public void add(double value) {
 		this.count++;
-		this.localValue += value;
+		this.sum += value;
 	}
 
 	public void add(long value) {
 		this.count++;
-		this.localValue += value;
+		this.sum += value;
 	}
 
 	public void add(int value) {
 		this.count++;
-		this.localValue += value;
+		this.sum += value;
 	}
 
 	@Override
@@ -58,21 +59,21 @@ public class AverageAccumulator implements SimpleAccumulator<Double> {
 		if (this.count == 0) {
 			return 0.0;
 		}
-		return this.localValue / (double)this.count;
+		return this.sum / this.count;
 	}
 
 	@Override
 	public void resetLocal() {
 		this.count = 0;
-		this.localValue = 0;
+		this.sum = 0;
 	}
 
 	@Override
 	public void merge(Accumulator<Double, Double> other) {
 		if (other instanceof AverageAccumulator) {
-			AverageAccumulator temp = (AverageAccumulator)other;
-			this.count += temp.count;
-			this.localValue += other.getLocalValue();
+			AverageAccumulator avg = (AverageAccumulator)other;
+			this.count += avg.count;
+			this.sum += avg.sum;
 		} else {
 			throw new IllegalArgumentException("The merged accumulator must be AverageAccumulator.");
 		}
@@ -81,13 +82,13 @@ public class AverageAccumulator implements SimpleAccumulator<Double> {
 	@Override
 	public AverageAccumulator clone() {
 		AverageAccumulator average = new AverageAccumulator();
-		average.localValue = this.localValue;
 		average.count = this.count;
+		average.sum = this.sum;
 		return average;
 	}
 
 	@Override
 	public String toString() {
-		return "AverageAccumulator " + this.localValue + " count " + this.count;
+		return "AverageAccumulator " + this.getLocalValue() + " for " + this.count + " elements";
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/428419d5/flink-core/src/test/java/org/apache/flink/api/common/accumulators/AverageAccumulatorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/accumulators/AverageAccumulatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/accumulators/AverageAccumulatorTest.java
index 9ebd27c..585511f 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/accumulators/AverageAccumulatorTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/accumulators/AverageAccumulatorTest.java
@@ -83,12 +83,18 @@ public class AverageAccumulatorTest {
 
 	@Test
 	public void testMergeSuccess() {
-		AverageAccumulator average = new AverageAccumulator();
-		AverageAccumulator averageNew = new AverageAccumulator();
-		average.add(1);
-		averageNew.add(2);
-		average.merge(averageNew);
-		assertEquals(1.5, average.getLocalValue(), 0.0);
+		AverageAccumulator avg1 = new AverageAccumulator();
+		for (int i = 0; i < 5; i++) {
+			avg1.add(i);
+		}
+
+		AverageAccumulator avg2 = new AverageAccumulator();
+		for (int i = 5; i < 10; i++) {
+			avg2.add(i);
+		}
+
+		avg1.merge(avg2);
+		assertEquals(4.5, avg1.getLocalValue(), 0.0);
 	}
 
 	@Test