You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/06/22 23:03:57 UTC
[5/9] beam git commit: ReduceFnTester assertion for windows that have
data buffered
ReduceFnTester assertion for windows that have data buffered
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/795760d3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/795760d3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/795760d3
Branch: refs/heads/master
Commit: 795760d370bcbe28e1f0ca373ad4c8c841e6e6b5
Parents: 1c1f239
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Jun 22 12:53:15 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Jun 22 13:58:08 2017 -0700
----------------------------------------------------------------------
.../org/apache/beam/runners/core/SystemReduceFn.java | 6 ++++++
.../org/apache/beam/runners/core/ReduceFnTester.java | 13 +++++++++++++
2 files changed, 19 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/795760d3/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
index c189b0d..3144bd6 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
@@ -18,6 +18,7 @@
package org.apache.beam.runners.core;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.CombiningState;
@@ -103,6 +104,11 @@ public abstract class SystemReduceFn<K, InputT, AccumT, OutputT, W extends Bound
this.bufferTag = bufferTag;
}
+ @VisibleForTesting
+ StateTag<? extends GroupingState<InputT, OutputT>> getBufferTag() {
+ return bufferTag;
+ }
+
@Override
public void processValue(ProcessValueContext c) throws Exception {
c.state().access(bufferTag).add(c.value());
http://git-wip-us.apache.org/repos/asf/beam/blob/795760d3/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
index ab9fd6e..1fe8f73 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
@@ -318,6 +318,19 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
}
@SafeVarargs
+ public final void assertHasOnlyGlobalAndStateFor(W... expectedWindows) {
+ assertHasOnlyGlobalAndAllowedTags(
+ ImmutableSet.copyOf(expectedWindows),
+ ImmutableSet.<StateTag<?>>of(
+ ((SystemReduceFn<?, ?, ?, ?, ?>) reduceFn).getBufferTag(),
+ TriggerStateMachineRunner.FINISHED_BITS_TAG,
+ PaneInfoTracker.PANE_INFO_TAG,
+ WatermarkHold.watermarkHoldTagForTimestampCombiner(
+ objectStrategy.getTimestampCombiner()),
+ WatermarkHold.EXTRA_HOLD_TAG));
+ }
+
+ @SafeVarargs
public final void assertHasOnlyGlobalAndFinishedSetsAndPaneInfoFor(W... expectedWindows) {
assertHasOnlyGlobalAndAllowedTags(
ImmutableSet.copyOf(expectedWindows),