You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/10/24 16:11:04 UTC
[10/14] incubator-beam git commit: Make DoFnTester aggregator
initialization idempotent
Make DoFnTester aggregator initialization idempotent
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/043ebeca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/043ebeca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/043ebeca
Branch: refs/heads/master
Commit: 043ebecacf7a8e96939b025afa8480c6df2f3b41
Parents: 2ab955d
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Oct 21 13:35:29 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sun Oct 23 21:04:17 2016 -0700
----------------------------------------------------------------------
.../apache/beam/sdk/transforms/DoFnTester.java | 18 +++++++++++++++++-
1 file changed, 17 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/043ebeca/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index 302bb02..7995719 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -543,6 +543,7 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
private <AinT, AccT, AoutT> Aggregator<AinT, AoutT> aggregator(
final String name,
final CombineFn<AinT, AccT, AoutT> combiner) {
+
Aggregator<AinT, AoutT> aggregator = new Aggregator<AinT, AoutT>() {
@Override
public void addValue(AinT value) {
@@ -561,7 +562,22 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
return combiner;
}
};
- accumulators.put(name, combiner.createAccumulator());
+
+ // Aggregator instantiation is idempotent
+ if (accumulators.containsKey(name)) {
+ Class<?> currentAccumClass = accumulators.get(name).getClass();
+ Class<?> createAccumClass = combiner.createAccumulator().getClass();
+ checkState(
+ currentAccumClass.isAssignableFrom(createAccumClass),
+ "Aggregator %s already initialized with accumulator type %s "
+ + "but was re-initialized with accumulator type %s",
+ name,
+ currentAccumClass,
+ createAccumClass);
+
+ } else {
+ accumulators.put(name, combiner.createAccumulator());
+ }
return aggregator;
}