You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/05/03 05:08:05 UTC

[1/2] incubator-beam git commit: Use CommittedResult in InMemoryWatermarkManager

Repository: incubator-beam
Updated Branches:
  refs/heads/master 69ec223e5 -> 34e05954d


Use CommittedResult in InMemoryWatermarkManager

This enable unprocessed elements to be handled in the Watermark manager
after they are added to the CommittedResult structure.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/dff82cae
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/dff82cae
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/dff82cae

Branch: refs/heads/master
Commit: dff82cae2f9d6393e4bdbb7fd527f58eb2cdaa01
Parents: 659cf2e
Author: Thomas Groh <tg...@google.com>
Authored: Thu Apr 28 13:42:36 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Mon May 2 12:54:02 2016 -0700

----------------------------------------------------------------------
 .../direct/InMemoryWatermarkManager.java        |  19 +-
 .../direct/InProcessEvaluationContext.java      |   6 +-
 .../direct/InMemoryWatermarkManagerTest.java    | 368 ++++++++++++-------
 3 files changed, 252 insertions(+), 141 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dff82cae/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java
index 769457a..4d5a3a1 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java
@@ -800,18 +800,19 @@ public class InMemoryWatermarkManager {
    * </pre>.
    *
    * @param completed the input that has completed
-   * @param transform the transform that has completed processing the input
-   * @param outputs the bundles the transform has output
+   * @param timerUpdate the timers that were added, removed, and completed as part of producing
+   *                    this update
+   * @param result the result that was produced by processing the input
    * @param earliestHold the earliest watermark hold in the transform's state. {@code null} if there
    *                     is no hold
    */
   public void updateWatermarks(
       @Nullable CommittedBundle<?> completed,
-      AppliedPTransform<?, ?, ?> transform,
       TimerUpdate timerUpdate,
-      Iterable<? extends CommittedBundle<?>> outputs,
+      CommittedResult result,
       @Nullable Instant earliestHold) {
-    updatePending(completed, transform, timerUpdate, outputs);
+    AppliedPTransform<?, ?, ?> transform = result.getTransform();
+    updatePending(completed, timerUpdate, result);
     TransformWatermarks transformWms = transformToWatermarks.get(transform);
     transformWms.setEventTimeHold(completed == null ? null : completed.getKey(), earliestHold);
     refreshWatermarks(transform);
@@ -846,15 +847,14 @@ public class InMemoryWatermarkManager {
    */
   private void updatePending(
       CommittedBundle<?> input,
-      AppliedPTransform<?, ?, ?> transform,
       TimerUpdate timerUpdate,
-      Iterable<? extends CommittedBundle<?>> outputs) {
-    TransformWatermarks completedTransform = transformToWatermarks.get(transform);
+      CommittedResult result) {
+    TransformWatermarks completedTransform = transformToWatermarks.get(result.getTransform());
 
     // Newly pending elements must be added before completed elements are removed, as the two
     // do not share a Mutex within this call and thus can be interleaved with external calls to
     // refresh.
-    for (CommittedBundle<?> bundle : outputs) {
+    for (CommittedBundle<?> bundle : result.getOutputs()) {
       for (AppliedPTransform<?, ?, ?> consumer : consumers.get(bundle.getPCollection())) {
         TransformWatermarks watermarks = transformToWatermarks.get(consumer);
         watermarks.addPending(bundle);
@@ -865,7 +865,6 @@ public class InMemoryWatermarkManager {
     if (input != null) {
       completedTransform.removePending(input);
     }
-
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dff82cae/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
index d9a7ff0..d4f891e 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
@@ -152,11 +152,11 @@ class InProcessEvaluationContext {
     Iterable<? extends CommittedBundle<?>> committedBundles =
         commitBundles(result.getOutputBundles());
     // Update watermarks and timers
+    CommittedResult committedResult = CommittedResult.create(result, committedBundles);
     watermarkManager.updateWatermarks(
         completedBundle,
-        result.getTransform(),
         result.getTimerUpdate().withCompletedTimers(completedTimers),
-        committedBundles,
+        committedResult,
         result.getWatermarkHold());
     fireAllAvailableCallbacks();
     // Update counters
@@ -176,7 +176,7 @@ class InProcessEvaluationContext {
         applicationStateInternals.remove(stepAndKey);
       }
     }
-    return CommittedResult.create(result, committedBundles);
+    return committedResult;
   }
 
   private Iterable<? extends CommittedBundle<?>> commitBundles(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dff82cae/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InMemoryWatermarkManagerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InMemoryWatermarkManagerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InMemoryWatermarkManagerTest.java
index 2880ade..15cdf8a 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InMemoryWatermarkManagerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InMemoryWatermarkManagerTest.java
@@ -159,8 +159,11 @@ public class InMemoryWatermarkManagerTest implements Serializable {
   @Test
   public void getWatermarkForUpdatedSourceTransform() {
     CommittedBundle<Integer> output = multiWindowedBundle(createdInts, 1);
-    manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
-        Collections.<CommittedBundle<?>>singleton(output), new Instant(8000L));
+    manager.updateWatermarks(null,
+        TimerUpdate.empty(),
+        result(createdInts.getProducingTransformInternal(),
+            Collections.<CommittedBundle<?>>singleton(output)),
+        new Instant(8000L));
     TransformWatermarks updatedSourceWatermark =
         manager.getWatermarks(createdInts.getProducingTransformInternal());
 
@@ -175,8 +178,10 @@ public class InMemoryWatermarkManagerTest implements Serializable {
   public void getWatermarkForMultiInputTransform() {
     CommittedBundle<Integer> secondPcollectionBundle = multiWindowedBundle(intsToFlatten, -1);
 
-    manager.updateWatermarks(null, intsToFlatten.getProducingTransformInternal(),
-        TimerUpdate.empty(), Collections.<CommittedBundle<?>>singleton(secondPcollectionBundle),
+    manager.updateWatermarks(null,
+        TimerUpdate.empty(),
+        result(intsToFlatten.getProducingTransformInternal(),
+            Collections.<CommittedBundle<?>>singleton(secondPcollectionBundle)),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
 
     // We didn't do anything for the first source, so we shouldn't have progressed the watermark
@@ -205,13 +210,17 @@ public class InMemoryWatermarkManagerTest implements Serializable {
     CommittedBundle<Integer> flattenedBundleSecondCreate = multiWindowedBundle(flattened, -1);
     // We have finished processing the bundle from the second PCollection, but we haven't consumed
     // anything from the first PCollection yet; so our watermark shouldn't advance
-    manager.updateWatermarks(secondPcollectionBundle, flattened.getProducingTransformInternal(),
-        TimerUpdate.empty(), Collections.<CommittedBundle<?>>singleton(flattenedBundleSecondCreate),
+    manager.updateWatermarks(secondPcollectionBundle,
+        TimerUpdate.empty(),
+        result(flattened.getProducingTransformInternal(),
+            Collections.<CommittedBundle<?>>singleton(flattenedBundleSecondCreate)),
         null);
     TransformWatermarks transformAfterProcessing =
         manager.getWatermarks(flattened.getProducingTransformInternal());
-    manager.updateWatermarks(secondPcollectionBundle, flattened.getProducingTransformInternal(),
-        TimerUpdate.empty(), Collections.<CommittedBundle<?>>singleton(flattenedBundleSecondCreate),
+    manager.updateWatermarks(secondPcollectionBundle,
+        TimerUpdate.empty(),
+        result(flattened.getProducingTransformInternal(),
+            Collections.<CommittedBundle<?>>singleton(flattenedBundleSecondCreate)),
         null);
     assertThat(
         transformAfterProcessing.getInputWatermark(),
@@ -225,8 +234,10 @@ public class InMemoryWatermarkManagerTest implements Serializable {
         timestampedBundle(createdInts, TimestampedValue.<Integer>of(5, firstCollectionTimestamp));
     // the source is done, but elements are still buffered. The source output watermark should be
     // past the end of the global window
-    manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
-        Collections.<CommittedBundle<?>>singleton(firstPcollectionBundle),
+    manager.updateWatermarks(null,
+        TimerUpdate.empty(),
+        result(createdInts.getProducingTransformInternal(),
+            Collections.<CommittedBundle<?>>singleton(firstPcollectionBundle)),
         new Instant(Long.MAX_VALUE));
     TransformWatermarks firstSourceWatermarks =
         manager.getWatermarks(createdInts.getProducingTransformInternal());
@@ -253,8 +264,10 @@ public class InMemoryWatermarkManagerTest implements Serializable {
 
     CommittedBundle<?> completedFlattenBundle =
         bundleFactory.createRootBundle(flattened).commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
-    manager.updateWatermarks(firstPcollectionBundle, flattened.getProducingTransformInternal(),
-        TimerUpdate.empty(), Collections.<CommittedBundle<?>>singleton(completedFlattenBundle),
+    manager.updateWatermarks(firstPcollectionBundle,
+        TimerUpdate.empty(),
+        result(flattened.getProducingTransformInternal(),
+            Collections.<CommittedBundle<?>>singleton(completedFlattenBundle)),
         null);
     TransformWatermarks afterConsumingAllInput =
         manager.getWatermarks(flattened.getProducingTransformInternal());
@@ -275,8 +288,11 @@ public class InMemoryWatermarkManagerTest implements Serializable {
     CommittedBundle<Integer> createdBundle = timestampedBundle(createdInts,
         TimestampedValue.of(1, new Instant(1_000_000L)), TimestampedValue.of(2, new Instant(1234L)),
         TimestampedValue.of(3, new Instant(-1000L)));
-    manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
-        Collections.<CommittedBundle<?>>singleton(createdBundle), new Instant(Long.MAX_VALUE));
+    manager.updateWatermarks(null,
+        TimerUpdate.empty(),
+        result(createdInts.getProducingTransformInternal(),
+            Collections.<CommittedBundle<?>>singleton(createdBundle)),
+        new Instant(Long.MAX_VALUE));
     TransformWatermarks createdAfterProducing =
         manager.getWatermarks(createdInts.getProducingTransformInternal());
     assertThat(
@@ -287,8 +303,11 @@ public class InMemoryWatermarkManagerTest implements Serializable {
         timestampedBundle(keyed, TimestampedValue.of(KV.of("MyKey", 1), new Instant(1_000_000L)),
             TimestampedValue.of(KV.of("MyKey", 2), new Instant(1234L)),
             TimestampedValue.of(KV.of("MyKey", 3), new Instant(-1000L)));
-    manager.updateWatermarks(createdBundle, keyed.getProducingTransformInternal(),
-        TimerUpdate.empty(), Collections.<CommittedBundle<?>>singleton(keyBundle), null);
+    manager.updateWatermarks(createdBundle,
+        TimerUpdate.empty(),
+        result(keyed.getProducingTransformInternal(),
+            Collections.<CommittedBundle<?>>singleton(keyBundle)),
+            null);
     TransformWatermarks keyedWatermarks =
         manager.getWatermarks(keyed.getProducingTransformInternal());
     assertThat(
@@ -303,8 +322,11 @@ public class InMemoryWatermarkManagerTest implements Serializable {
 
     CommittedBundle<Integer> filteredBundle =
         timestampedBundle(filtered, TimestampedValue.of(2, new Instant(1234L)));
-    manager.updateWatermarks(createdBundle, filtered.getProducingTransformInternal(),
-        TimerUpdate.empty(), Collections.<CommittedBundle<?>>singleton(filteredBundle), null);
+    manager.updateWatermarks(createdBundle,
+        TimerUpdate.empty(),
+        result(filtered.getProducingTransformInternal(),
+            Collections.<CommittedBundle<?>>singleton(filteredBundle)),
+        null);
     TransformWatermarks filteredProcessedWatermarks =
         manager.getWatermarks(filtered.getProducingTransformInternal());
     assertThat(
@@ -322,17 +344,23 @@ public class InMemoryWatermarkManagerTest implements Serializable {
   @Test
   public void updateWatermarkWithWatermarkHolds() {
     CommittedBundle<Integer> createdBundle = timestampedBundle(createdInts,
-        TimestampedValue.of(1, new Instant(1_000_000L)), TimestampedValue.of(2, new Instant(1234L)),
+        TimestampedValue.of(1, new Instant(1_000_000L)),
+        TimestampedValue.of(2, new Instant(1234L)),
         TimestampedValue.of(3, new Instant(-1000L)));
-    manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
-        Collections.<CommittedBundle<?>>singleton(createdBundle), new Instant(Long.MAX_VALUE));
+    manager.updateWatermarks(null,
+        TimerUpdate.empty(),
+        result(createdInts.getProducingTransformInternal(),
+            Collections.<CommittedBundle<?>>singleton(createdBundle)),
+        new Instant(Long.MAX_VALUE));
 
-    CommittedBundle<KV<String, Integer>> keyBundle =
-        timestampedBundle(keyed, TimestampedValue.of(KV.of("MyKey", 1), new Instant(1_000_000L)),
-            TimestampedValue.of(KV.of("MyKey", 2), new Instant(1234L)),
-            TimestampedValue.of(KV.of("MyKey", 3), new Instant(-1000L)));
-    manager.updateWatermarks(createdBundle, keyed.getProducingTransformInternal(),
-        TimerUpdate.empty(), Collections.<CommittedBundle<?>>singleton(keyBundle),
+    CommittedBundle<KV<String, Integer>> keyBundle = timestampedBundle(keyed,
+        TimestampedValue.of(KV.of("MyKey", 1), new Instant(1_000_000L)),
+        TimestampedValue.of(KV.of("MyKey", 2), new Instant(1234L)),
+        TimestampedValue.of(KV.of("MyKey", 3), new Instant(-1000L)));
+    manager.updateWatermarks(createdBundle,
+        TimerUpdate.empty(),
+        result(keyed.getProducingTransformInternal(),
+            Collections.<CommittedBundle<?>>singleton(keyBundle)),
         new Instant(500L));
     TransformWatermarks keyedWatermarks =
         manager.getWatermarks(keyed.getProducingTransformInternal());
@@ -358,40 +386,54 @@ public class InMemoryWatermarkManagerTest implements Serializable {
             .add(WindowedValue.timestampedValueInGlobalWindow(2, new Instant(1234L)))
             .commit(clock.now());
 
-    manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
-        ImmutableList.of(firstKeyBundle, secondKeyBundle), BoundedWindow.TIMESTAMP_MAX_VALUE);
+    manager.updateWatermarks(null,
+        TimerUpdate.empty(),
+        result(createdInts.getProducingTransformInternal(),
+            ImmutableList.of(firstKeyBundle, secondKeyBundle)),
+        BoundedWindow.TIMESTAMP_MAX_VALUE);
 
-    manager.updateWatermarks(firstKeyBundle, filtered.getProducingTransformInternal(),
-        TimerUpdate.empty(), Collections.<CommittedBundle<?>>emptyList(), new Instant(-1000L));
-    manager.updateWatermarks(secondKeyBundle, filtered.getProducingTransformInternal(),
-        TimerUpdate.empty(), Collections.<CommittedBundle<?>>emptyList(), new Instant(1234L));
+    manager.updateWatermarks(firstKeyBundle,
+        TimerUpdate.empty(),
+        result(filtered.getProducingTransformInternal(),
+            Collections.<CommittedBundle<?>>emptyList()),
+        new Instant(-1000L));
+    manager.updateWatermarks(secondKeyBundle,
+        TimerUpdate.empty(),
+        result(filtered.getProducingTransformInternal(),
+            Collections.<CommittedBundle<?>>emptyList()),
+        new Instant(1234L));
 
     TransformWatermarks filteredWatermarks =
         manager.getWatermarks(filtered.getProducingTransformInternal());
-    assertThat(
-        filteredWatermarks.getInputWatermark(),
+    assertThat(filteredWatermarks.getInputWatermark(),
         not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
     assertThat(filteredWatermarks.getOutputWatermark(), not(laterThan(new Instant(-1000L))));
 
     CommittedBundle<Integer> fauxFirstKeyTimerBundle =
         bundleFactory.createKeyedBundle(null, "Odd", createdInts).commit(clock.now());
-    manager.updateWatermarks(fauxFirstKeyTimerBundle, filtered.getProducingTransformInternal(),
-        TimerUpdate.empty(), Collections.<CommittedBundle<?>>emptyList(),
+    manager.updateWatermarks(fauxFirstKeyTimerBundle,
+        TimerUpdate.empty(),
+        result(filtered.getProducingTransformInternal(),
+            Collections.<CommittedBundle<?>>emptyList()),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
 
     assertThat(filteredWatermarks.getOutputWatermark(), equalTo(new Instant(1234L)));
 
     CommittedBundle<Integer> fauxSecondKeyTimerBundle =
         bundleFactory.createKeyedBundle(null, "Even", createdInts).commit(clock.now());
-    manager.updateWatermarks(fauxSecondKeyTimerBundle, filtered.getProducingTransformInternal(),
-        TimerUpdate.empty(), Collections.<CommittedBundle<?>>emptyList(), new Instant(5678L));
+    manager.updateWatermarks(fauxSecondKeyTimerBundle,
+        TimerUpdate.empty(),
+        result(filtered.getProducingTransformInternal(),
+            Collections.<CommittedBundle<?>>emptyList()),
+        new Instant(5678L));
     assertThat(filteredWatermarks.getOutputWatermark(), equalTo(new Instant(5678L)));
 
-    manager.updateWatermarks(fauxSecondKeyTimerBundle, filtered.getProducingTransformInternal(),
-        TimerUpdate.empty(), Collections.<CommittedBundle<?>>emptyList(),
+    manager.updateWatermarks(fauxSecondKeyTimerBundle,
+        TimerUpdate.empty(),
+        result(filtered.getProducingTransformInternal(),
+            Collections.<CommittedBundle<?>>emptyList()),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
-    assertThat(
-        filteredWatermarks.getOutputWatermark(),
+    assertThat(filteredWatermarks.getOutputWatermark(),
         not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
   }
 
@@ -403,16 +445,21 @@ public class InMemoryWatermarkManagerTest implements Serializable {
   public void updateOutputWatermarkShouldBeMonotonic() {
     CommittedBundle<?> firstInput =
         bundleFactory.createRootBundle(createdInts).commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
-    manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
-        Collections.<CommittedBundle<?>>singleton(firstInput), new Instant(0L));
+    manager.updateWatermarks(null,  TimerUpdate.empty(),
+        result(createdInts.getProducingTransformInternal(),
+            Collections.<CommittedBundle<?>>singleton(firstInput)),
+        new Instant(0L));
     TransformWatermarks firstWatermarks =
         manager.getWatermarks(createdInts.getProducingTransformInternal());
     assertThat(firstWatermarks.getOutputWatermark(), equalTo(new Instant(0L)));
 
     CommittedBundle<?> secondInput =
         bundleFactory.createRootBundle(createdInts).commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
-    manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
-        Collections.<CommittedBundle<?>>singleton(secondInput), new Instant(-250L));
+    manager.updateWatermarks(null,
+        TimerUpdate.empty(),
+        result(createdInts.getProducingTransformInternal(),
+            Collections.<CommittedBundle<?>>singleton(secondInput)),
+        new Instant(-250L));
     TransformWatermarks secondWatermarks =
         manager.getWatermarks(createdInts.getProducingTransformInternal());
     assertThat(secondWatermarks.getOutputWatermark(), not(earlierThan(new Instant(0L))));
@@ -425,17 +472,22 @@ public class InMemoryWatermarkManagerTest implements Serializable {
   @Test
   public void updateWatermarkWithHoldsShouldBeMonotonic() {
     CommittedBundle<Integer> createdBundle = timestampedBundle(createdInts,
-        TimestampedValue.of(1, new Instant(1_000_000L)), TimestampedValue.of(2, new Instant(1234L)),
-        TimestampedValue.of(3, new Instant(-1000L)));
-    manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
-        Collections.<CommittedBundle<?>>singleton(createdBundle), new Instant(Long.MAX_VALUE));
+        TimestampedValue.of(1, new Instant(1_000_000L)),
+            TimestampedValue.of(2, new Instant(1234L)),
+            TimestampedValue.of(3, new Instant(-1000L))); manager.updateWatermarks(null,
+        TimerUpdate.empty(),
+        result(createdInts.getProducingTransformInternal(),
+            Collections.<CommittedBundle<?>>singleton(createdBundle)),
+        new Instant(Long.MAX_VALUE));
 
     CommittedBundle<KV<String, Integer>> keyBundle =
         timestampedBundle(keyed, TimestampedValue.of(KV.of("MyKey", 1), new Instant(1_000_000L)),
             TimestampedValue.of(KV.of("MyKey", 2), new Instant(1234L)),
             TimestampedValue.of(KV.of("MyKey", 3), new Instant(-1000L)));
-    manager.updateWatermarks(createdBundle, keyed.getProducingTransformInternal(),
-        TimerUpdate.empty(), Collections.<CommittedBundle<?>>singleton(keyBundle),
+    manager.updateWatermarks(createdBundle,
+        TimerUpdate.empty(),
+        result(keyed.getProducingTransformInternal(),
+            Collections.<CommittedBundle<?>>singleton(keyBundle)),
         new Instant(500L));
     TransformWatermarks keyedWatermarks =
         manager.getWatermarks(keyed.getProducingTransformInternal());
@@ -462,16 +514,22 @@ public class InMemoryWatermarkManagerTest implements Serializable {
     CommittedBundle<Integer> createdBundle = timestampedBundle(createdInts,
         TimestampedValue.of(1, sourceWatermark), TimestampedValue.of(2, new Instant(1234L)));
 
-    manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
-        Collections.<CommittedBundle<?>>singleton(createdBundle), sourceWatermark);
+    manager.updateWatermarks(null,
+        TimerUpdate.empty(),
+        result(createdInts.getProducingTransformInternal(),
+            Collections.<CommittedBundle<?>>singleton(createdBundle)),
+        sourceWatermark);
 
     CommittedBundle<KV<String, Integer>> keyBundle =
         timestampedBundle(keyed, TimestampedValue.of(KV.of("MyKey", 1), sourceWatermark),
             TimestampedValue.of(KV.of("MyKey", 2), new Instant(1234L)));
 
     // Finish processing the on-time data. The watermarks should progress to be equal to the source
-    manager.updateWatermarks(createdBundle, keyed.getProducingTransformInternal(),
-        TimerUpdate.empty(), Collections.<CommittedBundle<?>>singleton(keyBundle), null);
+    manager.updateWatermarks(createdBundle,
+        TimerUpdate.empty(),
+        result(keyed.getProducingTransformInternal(),
+            Collections.<CommittedBundle<?>>singleton(keyBundle)),
+        null);
     TransformWatermarks onTimeWatermarks =
         manager.getWatermarks(keyed.getProducingTransformInternal());
     assertThat(onTimeWatermarks.getInputWatermark(), equalTo(sourceWatermark));
@@ -481,8 +539,11 @@ public class InMemoryWatermarkManagerTest implements Serializable {
         timestampedBundle(createdInts, TimestampedValue.of(3, new Instant(-1000L)));
     // the late data arrives in a downstream PCollection after its watermark has advanced past it;
     // we don't advance the watermark past the current watermark until we've consumed the late data
-    manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
-        Collections.<CommittedBundle<?>>singleton(lateDataBundle), new Instant(2_000_000L));
+    manager.updateWatermarks(null,
+        TimerUpdate.empty(),
+        result(createdInts.getProducingTransformInternal(),
+            Collections.<CommittedBundle<?>>singleton(lateDataBundle)),
+        new Instant(2_000_000L));
     TransformWatermarks bufferedLateWm =
         manager.getWatermarks(createdInts.getProducingTransformInternal());
     assertThat(bufferedLateWm.getOutputWatermark(), equalTo(new Instant(2_000_000L)));
@@ -496,30 +557,31 @@ public class InMemoryWatermarkManagerTest implements Serializable {
 
     CommittedBundle<KV<String, Integer>> lateKeyedBundle =
         timestampedBundle(keyed, TimestampedValue.of(KV.of("MyKey", 3), new Instant(-1000L)));
-    manager.updateWatermarks(lateDataBundle, keyed.getProducingTransformInternal(),
-        TimerUpdate.empty(), Collections.<CommittedBundle<?>>singleton(lateKeyedBundle), null);
+    manager.updateWatermarks(lateDataBundle,
+        TimerUpdate.empty(),
+        result(keyed.getProducingTransformInternal(),
+            Collections.<CommittedBundle<?>>singleton(lateKeyedBundle)),
+        null);
   }
 
   public void updateWatermarkWithDifferentWindowedValueInstances() {
     manager.updateWatermarks(
         null,
-        createdInts.getProducingTransformInternal(),
         TimerUpdate.empty(),
+        result(createdInts.getProducingTransformInternal(),
         Collections.<CommittedBundle<?>>singleton(
             bundleFactory
                 .createRootBundle(createdInts)
                 .add(WindowedValue.valueInGlobalWindow(1))
-                .commit(Instant.now())),
+                .commit(Instant.now()))),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
 
     manager.updateWatermarks(
-        bundleFactory
-            .createRootBundle(createdInts)
+        bundleFactory.createRootBundle(createdInts)
             .add(WindowedValue.valueInGlobalWindow(1))
             .commit(Instant.now()),
-        keyed.getProducingTransformInternal(),
         TimerUpdate.empty(),
-        Collections.<CommittedBundle<?>>emptyList(),
+        result(keyed.getProducingTransformInternal(), Collections.<CommittedBundle<?>>emptyList()),
         null);
     TransformWatermarks onTimeWatermarks =
         manager.getWatermarks(keyed.getProducingTransformInternal());
@@ -533,8 +595,10 @@ public class InMemoryWatermarkManagerTest implements Serializable {
   @Test
   public void getWatermarksAfterOnlyEmptyOutput() {
     CommittedBundle<Integer> emptyCreateOutput = multiWindowedBundle(createdInts);
-    manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
-        Collections.<CommittedBundle<?>>singleton(emptyCreateOutput),
+    manager.updateWatermarks(null,
+        TimerUpdate.empty(),
+        result(createdInts.getProducingTransformInternal(),
+            Collections.<CommittedBundle<?>>singleton(emptyCreateOutput)),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
     TransformWatermarks updatedSourceWatermarks =
         manager.getWatermarks(createdInts.getProducingTransformInternal());
@@ -560,12 +624,17 @@ public class InMemoryWatermarkManagerTest implements Serializable {
   @Test
   public void getWatermarksAfterHoldAndEmptyOutput() {
     CommittedBundle<Integer> firstCreateOutput = multiWindowedBundle(createdInts, 1, 2);
-    manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
-        Collections.<CommittedBundle<?>>singleton(firstCreateOutput), new Instant(12_000L));
+    manager.updateWatermarks(null,
+        TimerUpdate.empty(),
+        result(createdInts.getProducingTransformInternal(),
+            Collections.<CommittedBundle<?>>singleton(firstCreateOutput)),
+        new Instant(12_000L));
 
     CommittedBundle<Integer> firstFilterOutput = multiWindowedBundle(filtered);
-    manager.updateWatermarks(firstCreateOutput, filtered.getProducingTransformInternal(),
-        TimerUpdate.empty(), Collections.<CommittedBundle<?>>singleton(firstFilterOutput),
+    manager.updateWatermarks(firstCreateOutput,
+        TimerUpdate.empty(),
+        result(filtered.getProducingTransformInternal(),
+            Collections.<CommittedBundle<?>>singleton(firstFilterOutput)),
         new Instant(10_000L));
     TransformWatermarks firstFilterWatermarks =
         manager.getWatermarks(filtered.getProducingTransformInternal());
@@ -573,8 +642,10 @@ public class InMemoryWatermarkManagerTest implements Serializable {
     assertThat(firstFilterWatermarks.getOutputWatermark(), not(laterThan(new Instant(10_000L))));
 
     CommittedBundle<Integer> emptyCreateOutput = multiWindowedBundle(createdInts);
-    manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
-        Collections.<CommittedBundle<?>>singleton(emptyCreateOutput),
+    manager.updateWatermarks(null,
+        TimerUpdate.empty(),
+        result(createdInts.getProducingTransformInternal(),
+            Collections.<CommittedBundle<?>>singleton(emptyCreateOutput)),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
     TransformWatermarks updatedSourceWatermarks =
         manager.getWatermarks(createdInts.getProducingTransformInternal());
@@ -613,8 +684,11 @@ public class InMemoryWatermarkManagerTest implements Serializable {
     CommittedBundle<Integer> createOutput =
         bundleFactory.createRootBundle(createdInts).commit(new Instant(1250L));
 
-    manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
-        Collections.<CommittedBundle<?>>singleton(createOutput), BoundedWindow.TIMESTAMP_MAX_VALUE);
+    manager.updateWatermarks(null,
+        TimerUpdate.empty(),
+        result(createdInts.getProducingTransformInternal(),
+            Collections.<CommittedBundle<?>>singleton(createOutput)),
+        BoundedWindow.TIMESTAMP_MAX_VALUE);
     TransformWatermarks createAfterUpdate =
         manager.getWatermarks(createdInts.getProducingTransformInternal());
     assertThat(createAfterUpdate.getSynchronizedProcessingInputTime(), equalTo(clock.now()));
@@ -639,8 +713,10 @@ public class InMemoryWatermarkManagerTest implements Serializable {
 
     CommittedBundle<?> filterOutputBundle =
         bundleFactory.createRootBundle(intsToFlatten).commit(new Instant(1250L));
-    manager.updateWatermarks(createOutput, filtered.getProducingTransformInternal(),
-        TimerUpdate.empty(), Collections.<CommittedBundle<?>>singleton(filterOutputBundle),
+    manager.updateWatermarks(createOutput,
+        TimerUpdate.empty(),
+        result(filtered.getProducingTransformInternal(),
+            Collections.<CommittedBundle<?>>singleton(filterOutputBundle)),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
     TransformWatermarks filterAfterConsumed =
         manager.getWatermarks(filtered.getProducingTransformInternal());
@@ -661,8 +737,10 @@ public class InMemoryWatermarkManagerTest implements Serializable {
   //  @Test
   public void getSynchronizedProcessingTimeOutputHeldToPendingTimers() {
     CommittedBundle<Integer> createdBundle = multiWindowedBundle(createdInts, 1, 2, 4, 8);
-    manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
-        Collections.<CommittedBundle<?>>singleton(createdBundle), new Instant(1248L));
+    manager.updateWatermarks(null,  TimerUpdate.empty(),
+        result(createdInts.getProducingTransformInternal(),
+            Collections.<CommittedBundle<?>>singleton(createdBundle)),
+        new Instant(1248L));
 
     TransformWatermarks filteredWms =
         manager.getWatermarks(filtered.getProducingTransformInternal());
@@ -678,8 +756,10 @@ public class InMemoryWatermarkManagerTest implements Serializable {
         TimerData.of(StateNamespaces.global(), new Instant(4096L), TimeDomain.PROCESSING_TIME);
     TimerUpdate timers =
         TimerUpdate.builder("key").setTimer(pastTimer).setTimer(futureTimer).build();
-    manager.updateWatermarks(createdBundle, filtered.getProducingTransformInternal(), timers,
-        Collections.<CommittedBundle<?>>singleton(filteredBundle),
+    manager.updateWatermarks(createdBundle,
+        timers,
+        result(filtered.getProducingTransformInternal(),
+            Collections.<CommittedBundle<?>>singleton(filteredBundle)),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
     Instant startTime = clock.now();
     clock.set(startTime.plus(250L));
@@ -712,11 +792,11 @@ public class InMemoryWatermarkManagerTest implements Serializable {
         bundleFactory.createKeyedBundle(null, "key", filteredTimesTwo)
             .commit(filteredWms.getSynchronizedProcessingOutputTime());
     // Complete the processing time timer
-    manager.updateWatermarks(filteredTimerBundle, filtered.getProducingTransformInternal(),
+    manager.updateWatermarks(filteredTimerBundle,
         TimerUpdate.builder("key")
-            .withCompletedTimers(Collections.<TimerData>singleton(pastTimer))
-            .build(),
-        Collections.<CommittedBundle<?>>singleton(filteredTimerResult),
+            .withCompletedTimers(Collections.<TimerData>singleton(pastTimer)).build(),
+        result(filtered.getProducingTransformInternal(),
+            Collections.<CommittedBundle<?>>singleton(filteredTimerResult)),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
 
     clock.set(startTime.plus(500L));
@@ -726,8 +806,10 @@ public class InMemoryWatermarkManagerTest implements Serializable {
         filteredDoubledWms.getSynchronizedProcessingOutputTime(),
         not(earlierThan(filteredTimerResult.getSynchronizedProcessingOutputWatermark())));
 
-    manager.updateWatermarks(filteredTimerResult, filteredTimesTwo.getProducingTransformInternal(),
-        TimerUpdate.empty(), Collections.<CommittedBundle<?>>emptyList(),
+    manager.updateWatermarks(filteredTimerResult,
+        TimerUpdate.empty(),
+        result(filteredTimesTwo.getProducingTransformInternal(),
+            Collections.<CommittedBundle<?>>emptyList()),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
     assertThat(filteredDoubledWms.getSynchronizedProcessingOutputTime(), equalTo(clock.now()));
 
@@ -761,18 +843,23 @@ public class InMemoryWatermarkManagerTest implements Serializable {
     CommittedBundle<Integer> createOutput =
         bundleFactory.createRootBundle(createdInts).commit(new Instant(1250L));
 
-    manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
-        Collections.<CommittedBundle<?>>singleton(createOutput), BoundedWindow.TIMESTAMP_MAX_VALUE);
+    manager.updateWatermarks(null,
+        TimerUpdate.empty(),
+        result(createdInts.getProducingTransformInternal(),
+            Collections.<CommittedBundle<?>>singleton(createOutput)),
+        BoundedWindow.TIMESTAMP_MAX_VALUE);
     TransformWatermarks createAfterUpdate =
         manager.getWatermarks(createdInts.getProducingTransformInternal());
     assertThat(createAfterUpdate.getSynchronizedProcessingInputTime(), not(laterThan(clock.now())));
-    assertThat(
-        createAfterUpdate.getSynchronizedProcessingOutputTime(), not(laterThan(clock.now())));
+    assertThat(createAfterUpdate.getSynchronizedProcessingOutputTime(),
+        not(laterThan(clock.now())));
 
     CommittedBundle<Integer> createSecondOutput =
         bundleFactory.createRootBundle(createdInts).commit(new Instant(750L));
-    manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
-        Collections.<CommittedBundle<?>>singleton(createSecondOutput),
+    manager.updateWatermarks(null,
+        TimerUpdate.empty(),
+        result(createdInts.getProducingTransformInternal(),
+            Collections.<CommittedBundle<?>>singleton(createSecondOutput)),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
 
     assertThat(createAfterUpdate.getSynchronizedProcessingOutputTime(), equalTo(clock.now()));
@@ -781,16 +868,20 @@ public class InMemoryWatermarkManagerTest implements Serializable {
   @Test
   public void synchronizedProcessingInputTimeIsHeldToUpstreamProcessingTimeTimers() {
     CommittedBundle<Integer> created = multiWindowedBundle(createdInts, 1, 2, 3);
-    manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
-        Collections.<CommittedBundle<?>>singleton(created), new Instant(40_900L));
+    manager.updateWatermarks(null,
+        TimerUpdate.empty(),
+        result(createdInts.getProducingTransformInternal(),
+            Collections.<CommittedBundle<?>>singleton(created)),
+        new Instant(40_900L));
 
     CommittedBundle<Integer> filteredBundle = multiWindowedBundle(filtered, 2, 4);
     Instant upstreamHold = new Instant(2048L);
     TimerData upstreamProcessingTimer =
         TimerData.of(StateNamespaces.global(), upstreamHold, TimeDomain.PROCESSING_TIME);
-    manager.updateWatermarks(created, filtered.getProducingTransformInternal(),
+    manager.updateWatermarks(created,
         TimerUpdate.builder("key").setTimer(upstreamProcessingTimer).build(),
-        Collections.<CommittedBundle<?>>singleton(filteredBundle),
+        result(filtered.getProducingTransformInternal(),
+            Collections.<CommittedBundle<?>>singleton(filteredBundle)),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
 
     TransformWatermarks downstreamWms =
@@ -806,11 +897,12 @@ public class InMemoryWatermarkManagerTest implements Serializable {
     assertThat(downstreamWms.getSynchronizedProcessingInputTime(), equalTo(upstreamHold));
 
     CommittedBundle<Integer> otherCreated = multiWindowedBundle(createdInts, 4, 8, 12);
-    manager.updateWatermarks(otherCreated, filtered.getProducingTransformInternal(),
+    manager.updateWatermarks(otherCreated,
         TimerUpdate.builder("key")
-            .withCompletedTimers(Collections.singleton(upstreamProcessingTimer))
-            .build(),
-        Collections.<CommittedBundle<?>>emptyList(), BoundedWindow.TIMESTAMP_MAX_VALUE);
+            .withCompletedTimers(Collections.singleton(upstreamProcessingTimer)).build(),
+        result(filtered.getProducingTransformInternal(),
+            Collections.<CommittedBundle<?>>emptyList()),
+        BoundedWindow.TIMESTAMP_MAX_VALUE);
 
     assertThat(downstreamWms.getSynchronizedProcessingInputTime(), not(earlierThan(clock.now())));
   }
@@ -820,9 +912,9 @@ public class InMemoryWatermarkManagerTest implements Serializable {
     CommittedBundle<Integer> created = multiWindowedBundle(createdInts, 1, 2, 3);
     manager.updateWatermarks(
         null,
-        createdInts.getProducingTransformInternal(),
         TimerUpdate.empty(),
-        Collections.<CommittedBundle<?>>singleton(created),
+        result(createdInts.getProducingTransformInternal(),
+            Collections.<CommittedBundle<?>>singleton(created)),
         new Instant(29_919_235L));
 
     Instant upstreamHold = new Instant(2048L);
@@ -830,9 +922,9 @@ public class InMemoryWatermarkManagerTest implements Serializable {
         bundleFactory.createKeyedBundle(created, "key", filtered).commit(upstreamHold);
     manager.updateWatermarks(
         created,
-        filtered.getProducingTransformInternal(),
         TimerUpdate.empty(),
-        Collections.<CommittedBundle<?>>singleton(filteredBundle),
+        result(filtered.getProducingTransformInternal(),
+            Collections.<CommittedBundle<?>>singleton(filteredBundle)),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
 
     TransformWatermarks downstreamWms =
@@ -852,8 +944,10 @@ public class InMemoryWatermarkManagerTest implements Serializable {
 
     // Advance WM of keyed past the first timer, but ahead of the second and third
     CommittedBundle<Integer> createdBundle = multiWindowedBundle(filtered);
-    manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
-        Collections.singleton(createdBundle), new Instant(1500L));
+    manager.updateWatermarks(null,
+        TimerUpdate.empty(),
+        result(createdInts.getProducingTransformInternal(), Collections.singleton(createdBundle)),
+        new Instant(1500L));
 
     TimerData earliestTimer =
         TimerData.of(StateNamespaces.global(), new Instant(1000), TimeDomain.EVENT_TIME);
@@ -869,11 +963,10 @@ public class InMemoryWatermarkManagerTest implements Serializable {
             .setTimer(lastTimer)
             .build();
 
-    manager.updateWatermarks(
-        createdBundle,
-        filtered.getProducingTransformInternal(),
+    manager.updateWatermarks(createdBundle,
         update,
-        Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(intsToFlatten)),
+        result(filtered.getProducingTransformInternal(),
+            Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(intsToFlatten))),
         new Instant(1000L));
 
     Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> firstTransformFiredTimers =
@@ -886,8 +979,11 @@ public class InMemoryWatermarkManagerTest implements Serializable {
     FiredTimers firstFired = firstFilteredTimers.get(key);
     assertThat(firstFired.getTimers(TimeDomain.EVENT_TIME), contains(earliestTimer));
 
-    manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
-        Collections.<CommittedBundle<?>>emptyList(), new Instant(50_000L));
+    manager.updateWatermarks(null,
+        TimerUpdate.empty(),
+        result(createdInts.getProducingTransformInternal(),
+            Collections.<CommittedBundle<?>>emptyList()),
+        new Instant(50_000L));
     Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> secondTransformFiredTimers =
         manager.extractFiredTimers();
     assertThat(
@@ -909,8 +1005,10 @@ public class InMemoryWatermarkManagerTest implements Serializable {
 
     // Advance WM of keyed past the first timer, but ahead of the second and third
     CommittedBundle<Integer> createdBundle = multiWindowedBundle(filtered);
-    manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
-        Collections.singleton(createdBundle), new Instant(1500L));
+    manager.updateWatermarks(null,
+        TimerUpdate.empty(),
+        result(createdInts.getProducingTransformInternal(), Collections.singleton(createdBundle)),
+        new Instant(1500L));
 
     TimerData earliestTimer =
         TimerData.of(StateNamespaces.global(), new Instant(999L), TimeDomain.PROCESSING_TIME);
@@ -928,9 +1026,9 @@ public class InMemoryWatermarkManagerTest implements Serializable {
 
     manager.updateWatermarks(
         createdBundle,
-        filtered.getProducingTransformInternal(),
         update,
-        Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(intsToFlatten)),
+        result(filtered.getProducingTransformInternal(),
+        Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(intsToFlatten))),
         new Instant(1000L));
 
     Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> firstTransformFiredTimers =
@@ -944,8 +1042,11 @@ public class InMemoryWatermarkManagerTest implements Serializable {
     assertThat(firstFired.getTimers(TimeDomain.PROCESSING_TIME), contains(earliestTimer));
 
     clock.set(new Instant(50_000L));
-    manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
-        Collections.<CommittedBundle<?>>emptyList(), new Instant(50_000L));
+    manager.updateWatermarks(null,
+        TimerUpdate.empty(),
+        result(createdInts.getProducingTransformInternal(),
+            Collections.<CommittedBundle<?>>emptyList()),
+        new Instant(50_000L));
     Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> secondTransformFiredTimers =
         manager.extractFiredTimers();
     assertThat(
@@ -967,8 +1068,10 @@ public class InMemoryWatermarkManagerTest implements Serializable {
 
     // Advance WM of keyed past the first timer, but ahead of the second and third
     CommittedBundle<Integer> createdBundle = multiWindowedBundle(filtered);
-    manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
-        Collections.singleton(createdBundle), new Instant(1500L));
+    manager.updateWatermarks(null,
+        TimerUpdate.empty(),
+        result(createdInts.getProducingTransformInternal(), Collections.singleton(createdBundle)),
+        new Instant(1500L));
 
     TimerData earliestTimer = TimerData.of(
         StateNamespaces.global(), new Instant(999L), TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
@@ -986,9 +1089,9 @@ public class InMemoryWatermarkManagerTest implements Serializable {
 
     manager.updateWatermarks(
         createdBundle,
-        filtered.getProducingTransformInternal(),
         update,
-        Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(intsToFlatten)),
+        result(filtered.getProducingTransformInternal(),
+            Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(intsToFlatten))),
         new Instant(1000L));
 
     Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> firstTransformFiredTimers =
@@ -1003,8 +1106,11 @@ public class InMemoryWatermarkManagerTest implements Serializable {
         firstFired.getTimers(TimeDomain.SYNCHRONIZED_PROCESSING_TIME), contains(earliestTimer));
 
     clock.set(new Instant(50_000L));
-    manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
-        Collections.<CommittedBundle<?>>emptyList(), new Instant(50_000L));
+    manager.updateWatermarks(null,
+        TimerUpdate.empty(),
+        result(createdInts.getProducingTransformInternal(),
+            Collections.<CommittedBundle<?>>emptyList()),
+        new Instant(50_000L));
     Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> secondTransformFiredTimers =
         manager.extractFiredTimers();
     assertThat(
@@ -1133,7 +1239,6 @@ public class InMemoryWatermarkManagerTest implements Serializable {
         ReadableInstant instant = (ReadableInstant) item;
         return instant.isAfter(shouldBeEarlier);
       }
-
       @Override
       public void describeTo(Description description) {
         description.appendText("later than ").appendValue(shouldBeEarlier);
@@ -1165,4 +1270,11 @@ public class InMemoryWatermarkManagerTest implements Serializable {
     }
     return bundle.commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
   }
+
+  private final CommittedResult result(
+      AppliedPTransform<?, ?, ?> transform,
+      Iterable<? extends CommittedBundle<?>> bundles) {
+    return CommittedResult.create(StepTransformResult.withoutHold(transform)
+        .build(), bundles);
+  }
 }



[2/2] incubator-beam git commit: This closes #265

Posted by ke...@apache.org.
This closes #265


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/34e05954
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/34e05954
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/34e05954

Branch: refs/heads/master
Commit: 34e05954dc70610cb519348dc6b7ced1934bf574
Parents: 69ec223 dff82ca
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon May 2 20:07:35 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon May 2 20:07:35 2016 -0700

----------------------------------------------------------------------
 .../direct/InMemoryWatermarkManager.java        |  19 +-
 .../direct/InProcessEvaluationContext.java      |   6 +-
 .../direct/InMemoryWatermarkManagerTest.java    | 368 ++++++++++++-------
 3 files changed, 252 insertions(+), 141 deletions(-)
----------------------------------------------------------------------