You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/06/22 23:03:57 UTC

[5/9] beam git commit: ReduceFnTester assertion for windows that have data buffered

ReduceFnTester assertion for windows that have data buffered


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/795760d3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/795760d3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/795760d3

Branch: refs/heads/master
Commit: 795760d370bcbe28e1f0ca373ad4c8c841e6e6b5
Parents: 1c1f239
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Jun 22 12:53:15 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Jun 22 13:58:08 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/runners/core/SystemReduceFn.java   |  6 ++++++
 .../org/apache/beam/runners/core/ReduceFnTester.java   | 13 +++++++++++++
 2 files changed, 19 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/795760d3/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
index c189b0d..3144bd6 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
@@ -18,6 +18,7 @@
 package org.apache.beam.runners.core;
 
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.state.BagState;
 import org.apache.beam.sdk.state.CombiningState;
@@ -103,6 +104,11 @@ public abstract class SystemReduceFn<K, InputT, AccumT, OutputT, W extends Bound
     this.bufferTag = bufferTag;
   }
 
+  @VisibleForTesting
+  StateTag<? extends GroupingState<InputT, OutputT>> getBufferTag() {
+    return bufferTag;
+  }
+
   @Override
   public void processValue(ProcessValueContext c) throws Exception {
     c.state().access(bufferTag).add(c.value());

http://git-wip-us.apache.org/repos/asf/beam/blob/795760d3/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
index ab9fd6e..1fe8f73 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
@@ -318,6 +318,19 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
   }
 
   @SafeVarargs
+  public final void assertHasOnlyGlobalAndStateFor(W... expectedWindows) {
+    assertHasOnlyGlobalAndAllowedTags(
+        ImmutableSet.copyOf(expectedWindows),
+        ImmutableSet.<StateTag<?>>of(
+            ((SystemReduceFn<?, ?, ?, ?, ?>) reduceFn).getBufferTag(),
+            TriggerStateMachineRunner.FINISHED_BITS_TAG,
+            PaneInfoTracker.PANE_INFO_TAG,
+            WatermarkHold.watermarkHoldTagForTimestampCombiner(
+                objectStrategy.getTimestampCombiner()),
+            WatermarkHold.EXTRA_HOLD_TAG));
+  }
+
+  @SafeVarargs
   public final void assertHasOnlyGlobalAndFinishedSetsAndPaneInfoFor(W... expectedWindows) {
     assertHasOnlyGlobalAndAllowedTags(
         ImmutableSet.copyOf(expectedWindows),