You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/09/21 20:27:27 UTC
[2/3] git commit: [FLINK-1096] Correction to histogram accumulator
[FLINK-1096] Correction to histogram accumulator
This closes #117
* each key is associated with the number of times it was inserted into the accumulator
* backed histogram with a tree map to present the entries sorted by key
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/9463e27b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/9463e27b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/9463e27b
Branch: refs/heads/master
Commit: 9463e27bc5142b3f72ee26e5eb25de9dac1b0125
Parents: e17575b
Author: Sebastian Kruse <se...@hpi.de>
Authored: Wed Sep 10 18:59:09 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Sep 21 19:51:07 2014 +0200
----------------------------------------------------------------------
.../api/common/accumulators/Histogram.java | 32 +++++++++-----------
.../test/accumulators/AccumulatorITCase.java | 2 +-
2 files changed, 16 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9463e27b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/Histogram.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/Histogram.java b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/Histogram.java
index 3d72b91..fd9f554 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/Histogram.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/Histogram.java
@@ -29,7 +29,8 @@ import com.google.common.collect.Maps;
/**
* Histogram for discrete-data. Let's you populate a histogram distributedly.
- * Implemented as a Integer->Integer HashMap
+ * Implemented as a Integer->Integer TreeMap, so that the entries are sorted
+ * according to the values.
*
* Could be extended to continuous values later, but then we need to dynamically
* decide about the bin size in an online algorithm (or ask the user)
@@ -38,21 +39,18 @@ public class Histogram implements Accumulator<Integer, Map<Integer, Integer>> {
private static final long serialVersionUID = 1L;
- private Map<Integer, Integer> hashMap = Maps.newHashMap();
+ private Map<Integer, Integer> treeMap = Maps.newTreeMap();
@Override
public void add(Integer value) {
- Integer current = hashMap.get(value);
- Integer newValue = value;
- if (current != null) {
- newValue = current + newValue;
- }
- this.hashMap.put(value, newValue);
+ Integer current = treeMap.get(value);
+ Integer newValue = (current != null ? current : 0) + 1;
+ this.treeMap.put(value, newValue);
}
@Override
public Map<Integer, Integer> getLocalValue() {
- return this.hashMap;
+ return this.treeMap;
}
@Override
@@ -60,29 +58,29 @@ public class Histogram implements Accumulator<Integer, Map<Integer, Integer>> {
// Merge the values into this map
for (Map.Entry<Integer, Integer> entryFromOther : ((Histogram) other).getLocalValue()
.entrySet()) {
- Integer ownValue = this.hashMap.get(entryFromOther.getKey());
+ Integer ownValue = this.treeMap.get(entryFromOther.getKey());
if (ownValue == null) {
- this.hashMap.put(entryFromOther.getKey(), entryFromOther.getValue());
+ this.treeMap.put(entryFromOther.getKey(), entryFromOther.getValue());
} else {
- this.hashMap.put(entryFromOther.getKey(), entryFromOther.getValue() + ownValue);
+ this.treeMap.put(entryFromOther.getKey(), entryFromOther.getValue() + ownValue);
}
}
}
@Override
public void resetLocal() {
- this.hashMap.clear();
+ this.treeMap.clear();
}
@Override
public String toString() {
- return this.hashMap.toString();
+ return this.treeMap.toString();
}
@Override
public void write(DataOutputView out) throws IOException {
- out.writeInt(hashMap.size());
- for (Map.Entry<Integer, Integer> entry : hashMap.entrySet()) {
+ out.writeInt(treeMap.size());
+ for (Map.Entry<Integer, Integer> entry : treeMap.entrySet()) {
out.writeInt(entry.getKey());
out.writeInt(entry.getValue());
}
@@ -92,7 +90,7 @@ public class Histogram implements Accumulator<Integer, Map<Integer, Integer>> {
public void read(DataInputView in) throws IOException {
int size = in.readInt();
for (int i = 0; i < size; ++i) {
- hashMap.put(in.readInt(), in.readInt());
+ treeMap.put(in.readInt(), in.readInt());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9463e27b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java
index 5497004..f45e8be 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java
@@ -107,7 +107,7 @@ public class AccumulatorITCase extends RecordAPITestBase {
// Test histogram (words per line distribution)
Map<Integer, Integer> dist = Maps.newHashMap();
- dist.put(1, 1); dist.put(2, 2); dist.put(3, 3);
+ dist.put(1, 1); dist.put(2, 1); dist.put(3, 1);
Assert.assertEquals(dist, res.getAccumulatorResult("words-per-line"));
// Test distinct words (custom accumulator)