You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/09/21 20:27:27 UTC

[2/3] git commit: [FLINK-1096] Correction to histogram accumulator

[FLINK-1096] Correction to histogram accumulator

This closes #117
* each key is associated with the number of times it was inserted into the accumulator
* backed histogram with a tree map to present the entries sorted by key


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/9463e27b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/9463e27b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/9463e27b

Branch: refs/heads/master
Commit: 9463e27bc5142b3f72ee26e5eb25de9dac1b0125
Parents: e17575b
Author: Sebastian Kruse <se...@hpi.de>
Authored: Wed Sep 10 18:59:09 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Sep 21 19:51:07 2014 +0200

----------------------------------------------------------------------
 .../api/common/accumulators/Histogram.java      | 32 +++++++++-----------
 .../test/accumulators/AccumulatorITCase.java    |  2 +-
 2 files changed, 16 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9463e27b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/Histogram.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/Histogram.java b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/Histogram.java
index 3d72b91..fd9f554 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/Histogram.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/Histogram.java
@@ -29,7 +29,8 @@ import com.google.common.collect.Maps;
 
 /**
  * Histogram for discrete-data. Let's you populate a histogram distributedly.
- * Implemented as a Integer->Integer HashMap
+ * Implemented as a Integer->Integer TreeMap, so that the entries are sorted
+ * according to the values.
  * 
  * Could be extended to continuous values later, but then we need to dynamically
  * decide about the bin size in an online algorithm (or ask the user)
@@ -38,21 +39,18 @@ public class Histogram implements Accumulator<Integer, Map<Integer, Integer>> {
 
 	private static final long serialVersionUID = 1L;
 
-	private Map<Integer, Integer> hashMap = Maps.newHashMap();
+	private Map<Integer, Integer> treeMap = Maps.newTreeMap();
 
 	@Override
 	public void add(Integer value) {
-		Integer current = hashMap.get(value);
-		Integer newValue = value;
-		if (current != null) {
-			newValue = current + newValue;
-		}
-		this.hashMap.put(value, newValue);
+		Integer current = treeMap.get(value);
+		Integer newValue = (current != null ? current : 0) + 1;
+		this.treeMap.put(value, newValue);
 	}
 
 	@Override
 	public Map<Integer, Integer> getLocalValue() {
-		return this.hashMap;
+		return this.treeMap;
 	}
 
 	@Override
@@ -60,29 +58,29 @@ public class Histogram implements Accumulator<Integer, Map<Integer, Integer>> {
 		// Merge the values into this map
 		for (Map.Entry<Integer, Integer> entryFromOther : ((Histogram) other).getLocalValue()
 				.entrySet()) {
-			Integer ownValue = this.hashMap.get(entryFromOther.getKey());
+			Integer ownValue = this.treeMap.get(entryFromOther.getKey());
 			if (ownValue == null) {
-				this.hashMap.put(entryFromOther.getKey(), entryFromOther.getValue());
+				this.treeMap.put(entryFromOther.getKey(), entryFromOther.getValue());
 			} else {
-				this.hashMap.put(entryFromOther.getKey(), entryFromOther.getValue() + ownValue);
+				this.treeMap.put(entryFromOther.getKey(), entryFromOther.getValue() + ownValue);
 			}
 		}
 	}
 
 	@Override
 	public void resetLocal() {
-		this.hashMap.clear();
+		this.treeMap.clear();
 	}
 
 	@Override
 	public String toString() {
-		return this.hashMap.toString();
+		return this.treeMap.toString();
 	}
 
 	@Override
 	public void write(DataOutputView out) throws IOException {
-		out.writeInt(hashMap.size());
-		for (Map.Entry<Integer, Integer> entry : hashMap.entrySet()) {
+		out.writeInt(treeMap.size());
+		for (Map.Entry<Integer, Integer> entry : treeMap.entrySet()) {
 			out.writeInt(entry.getKey());
 			out.writeInt(entry.getValue());
 		}
@@ -92,7 +90,7 @@ public class Histogram implements Accumulator<Integer, Map<Integer, Integer>> {
 	public void read(DataInputView in) throws IOException {
 		int size = in.readInt();
 		for (int i = 0; i < size; ++i) {
-			hashMap.put(in.readInt(), in.readInt());
+			treeMap.put(in.readInt(), in.readInt());
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9463e27b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java
index 5497004..f45e8be 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java
@@ -107,7 +107,7 @@ public class AccumulatorITCase extends RecordAPITestBase {
 		
 		// Test histogram (words per line distribution)
 		Map<Integer, Integer> dist = Maps.newHashMap();
-		dist.put(1, 1); dist.put(2, 2); dist.put(3, 3);
+		dist.put(1, 1); dist.put(2, 1); dist.put(3, 1);
 		Assert.assertEquals(dist, res.getAccumulatorResult("words-per-line"));
 		
 		// Test distinct words (custom accumulator)