You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/05/09 18:46:54 UTC

[1/4] incubator-beam git commit: Refactor CompletionCallbacks

Repository: incubator-beam
Updated Branches:
  refs/heads/master da1b7556b -> 272493ed7


Refactor CompletionCallbacks

The default and timerful completion callbacks are identical, excepting
their calls to evaluationContext.commitResult; factor that code into a
common location.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e7df160a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e7df160a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e7df160a

Branch: refs/heads/master
Commit: e7df160a2cde6dead6c4f7e0ec0aaa5e4808239d
Parents: 9b9d73f
Author: Thomas Groh <tg...@google.com>
Authored: Tue May 3 13:22:13 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Tue May 3 14:27:22 2016 -0700

----------------------------------------------------------------------
 .../direct/ExecutorServiceParallelExecutor.java | 49 ++++++++++++--------
 1 file changed, 29 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e7df160a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index 18af363..6f26b6b 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -210,16 +210,20 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor {
   }
 
   /**
-   * The default {@link CompletionCallback}. The default completion callback is used to complete
-   * transform evaluations that are triggered due to the arrival of elements from an upstream
-   * transform, or for a source transform.
+   * The base implementation of {@link CompletionCallback} that provides implementations for
+   * {@link #handleResult(CommittedBundle, InProcessTransformResult)} and
+   * {@link #handleThrowable(CommittedBundle, Throwable)}, given an implementation of
+   * {@link #getCommittedResult(CommittedBundle, InProcessTransformResult)}.
    */
-  private class DefaultCompletionCallback implements CompletionCallback {
+  private abstract class CompletionCallbackBase implements CompletionCallback {
+    protected abstract CommittedResult getCommittedResult(
+        CommittedBundle<?> inputBundle,
+        InProcessTransformResult result);
+
     @Override
-    public CommittedResult handleResult(
+    public final CommittedResult handleResult(
         CommittedBundle<?> inputBundle, InProcessTransformResult result) {
-      CommittedResult committedResult =
-          evaluationContext.handleResult(inputBundle, Collections.<TimerData>emptyList(), result);
+      CommittedResult committedResult = getCommittedResult(inputBundle, result);
       for (CommittedBundle<?> outputBundle : committedResult.getOutputs()) {
         allUpdates.offer(ExecutorUpdate.fromBundle(outputBundle));
       }
@@ -233,12 +237,27 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor {
   }
 
   /**
+   * The default {@link CompletionCallback}. The default completion callback is used to complete
+   * transform evaluations that are triggered due to the arrival of elements from an upstream
+   * transform, or for a source transform.
+   */
+  private class DefaultCompletionCallback extends CompletionCallbackBase {
+    @Override
+    public CommittedResult getCommittedResult(
+        CommittedBundle<?> inputBundle, InProcessTransformResult result) {
+      return evaluationContext.handleResult(inputBundle,
+          Collections.<TimerData>emptyList(),
+          result);
+    }
+  }
+
+  /**
    * A {@link CompletionCallback} where the completed bundle was produced to deliver some collection
    * of {@link TimerData timers}. When the evaluator completes successfully, reports all of the
    * timers used to create the input to the {@link InProcessEvaluationContext evaluation context}
    * as part of the result.
    */
-  private class TimerCompletionCallback implements CompletionCallback {
+  private class TimerCompletionCallback extends CompletionCallbackBase {
     private final Iterable<TimerData> timers;
 
     private TimerCompletionCallback(Iterable<TimerData> timers) {
@@ -246,19 +265,9 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor {
     }
 
     @Override
-    public CommittedResult handleResult(
+    public CommittedResult getCommittedResult(
         CommittedBundle<?> inputBundle, InProcessTransformResult result) {
-      CommittedResult committedResult =
-          evaluationContext.handleResult(inputBundle, timers, result);
-      for (CommittedBundle<?> outputBundle : committedResult.getOutputs()) {
-        allUpdates.offer(ExecutorUpdate.fromBundle(outputBundle));
-      }
-      return committedResult;
-    }
-
-    @Override
-    public void handleThrowable(CommittedBundle<?> inputBundle, Throwable t) {
-      allUpdates.offer(ExecutorUpdate.fromThrowable(t));
+          return evaluationContext.handleResult(inputBundle, timers, result);
     }
   }
 


[4/4] incubator-beam git commit: This closes #283

Posted by ke...@apache.org.
This closes #283


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/272493ed
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/272493ed
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/272493ed

Branch: refs/heads/master
Commit: 272493ed7c86b81f3573ea341f6a2ff138da9800
Parents: da1b755 0518fc6
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon May 9 11:46:21 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon May 9 11:46:21 2016 -0700

----------------------------------------------------------------------
 .../beam/runners/direct/CommittedResult.java    |  16 ++-
 .../direct/ExecutorServiceParallelExecutor.java |  65 +++++-----
 .../direct/InMemoryWatermarkManager.java        |   7 +-
 .../direct/InProcessEvaluationContext.java      |   6 +-
 .../direct/InProcessTransformResult.java        |   7 +
 .../runners/direct/StepTransformResult.java     |  17 +++
 .../runners/direct/CommittedResultTest.java     |  34 ++++-
 .../direct/InMemoryWatermarkManagerTest.java    | 127 ++++++++++++++++---
 .../runners/direct/TransformExecutorTest.java   |  11 +-
 9 files changed, 239 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/272493ed/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/272493ed/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
----------------------------------------------------------------------


[3/4] incubator-beam git commit: Allow InProcess TransformEvaluators to refuse inputs

Posted by ke...@apache.org.
Allow InProcess TransformEvaluators to refuse inputs

Inputs that cannot be processed (generally due to a side input not being
ready) can be added to a list of unprocessed elements, which will
schedule them to be executed at a later point.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0518fc68
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0518fc68
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0518fc68

Branch: refs/heads/master
Commit: 0518fc687febca6dca75159924265fdc6196a572
Parents: 59cca8d
Author: Thomas Groh <tg...@google.com>
Authored: Tue May 3 13:24:25 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri May 6 11:17:37 2016 -0700

----------------------------------------------------------------------
 .../beam/runners/direct/CommittedResult.java    |  16 ++-
 .../direct/ExecutorServiceParallelExecutor.java |   6 +
 .../direct/InMemoryWatermarkManager.java        |   7 +-
 .../direct/InProcessEvaluationContext.java      |   6 +-
 .../direct/InProcessTransformResult.java        |   7 +
 .../runners/direct/StepTransformResult.java     |  17 +++
 .../runners/direct/CommittedResultTest.java     |  34 ++++-
 .../direct/InMemoryWatermarkManagerTest.java    | 127 ++++++++++++++++---
 .../runners/direct/TransformExecutorTest.java   |  11 +-
 9 files changed, 209 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0518fc68/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java
index d15e012..4a42e34 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java
@@ -23,6 +23,8 @@ import org.apache.beam.sdk.transforms.AppliedPTransform;
 
 import com.google.auto.value.AutoValue;
 
+import javax.annotation.Nullable;
+
 /**
  * A {@link InProcessTransformResult} that has been committed.
  */
@@ -34,13 +36,25 @@ abstract class CommittedResult {
   public abstract AppliedPTransform<?, ?, ?> getTransform();
 
   /**
+   * Returns the {@link CommittedBundle} that contains the input elements that could not be
+   * processed by the evaluation.
+   *
+   * <p>{@code null} if the input bundle was null.
+   */
+  @Nullable
+  public abstract CommittedBundle<?> getUnprocessedInputs();
+
+  /**
    * Returns the outputs produced by the transform.
    */
   public abstract Iterable<? extends CommittedBundle<?>> getOutputs();
 
   public static CommittedResult create(
-      InProcessTransformResult original, Iterable<? extends CommittedBundle<?>> outputs) {
+      InProcessTransformResult original,
+      CommittedBundle<?> unprocessedElements,
+      Iterable<? extends CommittedBundle<?>> outputs) {
     return new AutoValue_CommittedResult(original.getTransform(),
+        unprocessedElements,
         outputs);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0518fc68/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index fd4cc2c..570ddc4 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -37,6 +37,7 @@ import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
 
 import org.joda.time.Instant;
 import org.slf4j.Logger;
@@ -230,6 +231,11 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor {
         allUpdates.offer(ExecutorUpdate.fromBundle(outputBundle,
             valueToConsumers.get(outputBundle.getPCollection())));
       }
+      CommittedBundle<?> unprocessedInputs = committedResult.getUnprocessedInputs();
+      if (unprocessedInputs != null && !Iterables.isEmpty(unprocessedInputs.getElements())) {
+        allUpdates.offer(ExecutorUpdate.fromBundle(unprocessedInputs,
+            Collections.<AppliedPTransform<?, ?, ?>>singleton(committedResult.getTransform())));
+      }
       return committedResult;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0518fc68/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java
index 4d5a3a1..87ea4d5 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java
@@ -849,8 +849,6 @@ public class InMemoryWatermarkManager {
       CommittedBundle<?> input,
       TimerUpdate timerUpdate,
       CommittedResult result) {
-    TransformWatermarks completedTransform = transformToWatermarks.get(result.getTransform());
-
     // Newly pending elements must be added before completed elements are removed, as the two
     // do not share a Mutex within this call and thus can be interleaved with external calls to
     // refresh.
@@ -861,6 +859,11 @@ public class InMemoryWatermarkManager {
       }
     }
 
+    TransformWatermarks completedTransform = transformToWatermarks.get(result.getTransform());
+    if (input != null) {
+      // Add the unprocessed inputs
+      completedTransform.addPending(result.getUnprocessedInputs());
+    }
     completedTransform.updateTimers(timerUpdate);
     if (input != null) {
       completedTransform.removePending(input);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0518fc68/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
index d4f891e..5c19287 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
@@ -152,7 +152,11 @@ class InProcessEvaluationContext {
     Iterable<? extends CommittedBundle<?>> committedBundles =
         commitBundles(result.getOutputBundles());
     // Update watermarks and timers
-    CommittedResult committedResult = CommittedResult.create(result, committedBundles);
+    CommittedResult committedResult = CommittedResult.create(result,
+        completedBundle == null ?
+            null :
+            completedBundle.withElements((Iterable) result.getUnprocessedElements()),
+        committedBundles);
     watermarkManager.updateWatermarks(
         completedBundle,
         result.getTimerUpdate().withCompletedTimers(completedTimers),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0518fc68/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessTransformResult.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessTransformResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessTransformResult.java
index a132c33..0bc3ea1 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessTransformResult.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessTransformResult.java
@@ -22,6 +22,7 @@ import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.common.CounterSet;
 import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals;
 
@@ -45,6 +46,12 @@ public interface InProcessTransformResult {
   Iterable<? extends UncommittedBundle<?>> getOutputBundles();
 
   /**
+   * Returns elements that were provided to the {@link TransformEvaluator} as input but were not
+   * processed.
+   */
+  Iterable<? extends WindowedValue<?>> getUnprocessedElements();
+
+  /**
    * Returns the {@link CounterSet} used by this {@link PTransform}, or null if this transform did
    * not use a {@link CounterSet}.
    */

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0518fc68/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
index 46e7d04..b2e3897 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
@@ -23,6 +23,7 @@ import org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate;
 import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.common.CounterSet;
 import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals;
 
@@ -41,6 +42,7 @@ import javax.annotation.Nullable;
 public class StepTransformResult implements InProcessTransformResult {
   private final AppliedPTransform<?, ?, ?> transform;
   private final Iterable<? extends UncommittedBundle<?>> bundles;
+  private final Iterable<? extends WindowedValue<?>> unprocessedElements;
   @Nullable private final CopyOnAccessInMemoryStateInternals<?> state;
   private final TimerUpdate timerUpdate;
   @Nullable private final CounterSet counters;
@@ -49,12 +51,14 @@ public class StepTransformResult implements InProcessTransformResult {
   private StepTransformResult(
       AppliedPTransform<?, ?, ?> transform,
       Iterable<? extends UncommittedBundle<?>> outputBundles,
+      Iterable<? extends WindowedValue<?>> unprocessedElements,
       CopyOnAccessInMemoryStateInternals<?> state,
       TimerUpdate timerUpdate,
       CounterSet counters,
       Instant watermarkHold) {
     this.transform = checkNotNull(transform);
     this.bundles = checkNotNull(outputBundles);
+    this.unprocessedElements = checkNotNull(unprocessedElements);
     this.state = state;
     this.timerUpdate = checkNotNull(timerUpdate);
     this.counters = counters;
@@ -67,6 +71,11 @@ public class StepTransformResult implements InProcessTransformResult {
   }
 
   @Override
+  public Iterable<? extends WindowedValue<?>> getUnprocessedElements() {
+    return unprocessedElements;
+  }
+
+  @Override
   public CounterSet getCounters() {
     return counters;
   }
@@ -113,6 +122,7 @@ public class StepTransformResult implements InProcessTransformResult {
   public static class Builder {
     private final AppliedPTransform<?, ?, ?> transform;
     private final ImmutableList.Builder<UncommittedBundle<?>> bundlesBuilder;
+    private final ImmutableList.Builder<WindowedValue<?>> unprocessedElementsBuilder;
     private CopyOnAccessInMemoryStateInternals<?> state;
     private TimerUpdate timerUpdate;
     private CounterSet counters;
@@ -122,6 +132,7 @@ public class StepTransformResult implements InProcessTransformResult {
       this.transform = transform;
       this.watermarkHold = watermarkHold;
       this.bundlesBuilder = ImmutableList.builder();
+      this.unprocessedElementsBuilder = ImmutableList.builder();
       this.timerUpdate = TimerUpdate.builder(null).build();
     }
 
@@ -129,6 +140,7 @@ public class StepTransformResult implements InProcessTransformResult {
       return new StepTransformResult(
           transform,
           bundlesBuilder.build(),
+          unprocessedElementsBuilder.build(),
           state,
           timerUpdate,
           counters,
@@ -150,6 +162,11 @@ public class StepTransformResult implements InProcessTransformResult {
       return this;
     }
 
+    public Builder addUnprocessedElements(Iterable<? extends WindowedValue<?>> unprocessed) {
+      unprocessedElementsBuilder.addAll(unprocessed);
+      return this;
+    }
+
     public Builder addOutput(
         UncommittedBundle<?> outputBundle, UncommittedBundle<?>... outputBundles) {
       bundlesBuilder.add(outputBundle);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0518fc68/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
index b30e005..0d1b464 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
@@ -18,11 +18,14 @@
 
 package org.apache.beam.runners.direct;
 
+import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assert.assertThat;
 
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
@@ -46,6 +49,7 @@ import java.util.List;
 @RunWith(JUnit4.class)
 public class CommittedResultTest implements Serializable {
   private transient TestPipeline p = TestPipeline.create();
+  private transient PCollection<Integer> created = p.apply(Create.of(1, 2));
   private transient AppliedPTransform<?, ?, ?> transform =
       AppliedPTransform.of("foo", p.begin(), PDone.in(p), new PTransform<PBegin, PDone>() {
       });
@@ -55,12 +59,38 @@ public class CommittedResultTest implements Serializable {
   public void getTransformExtractsFromResult() {
     CommittedResult result =
         CommittedResult.create(StepTransformResult.withoutHold(transform).build(),
+            bundleFactory.createRootBundle(created).commit(Instant.now()),
             Collections.<InProcessPipelineRunner.CommittedBundle<?>>emptyList());
 
     assertThat(result.getTransform(), Matchers.<AppliedPTransform<?, ?, ?>>equalTo(transform));
   }
 
   @Test
+  public void getUncommittedElementsEqualInput() {
+    InProcessPipelineRunner.CommittedBundle<Integer> bundle =
+        bundleFactory.createRootBundle(created)
+            .add(WindowedValue.valueInGlobalWindow(2))
+            .commit(Instant.now());
+    CommittedResult result =
+        CommittedResult.create(StepTransformResult.withoutHold(transform).build(),
+            bundle,
+            Collections.<InProcessPipelineRunner.CommittedBundle<?>>emptyList());
+
+    assertThat(result.getUnprocessedInputs(),
+        Matchers.<InProcessPipelineRunner.CommittedBundle<?>>equalTo(bundle));
+  }
+
+  @Test
+  public void getUncommittedElementsNull() {
+    CommittedResult result =
+        CommittedResult.create(StepTransformResult.withoutHold(transform).build(),
+            null,
+            Collections.<InProcessPipelineRunner.CommittedBundle<?>>emptyList());
+
+    assertThat(result.getUnprocessedInputs(), nullValue());
+  }
+
+  @Test
   public void getOutputsEqualInput() {
     List<? extends InProcessPipelineRunner.CommittedBundle<?>> outputs =
         ImmutableList.of(bundleFactory.createRootBundle(PCollection.createPrimitiveOutputInternal(p,
@@ -70,7 +100,9 @@ public class CommittedResultTest implements Serializable {
                 WindowingStrategy.globalDefault(),
                 PCollection.IsBounded.UNBOUNDED)).commit(Instant.now()));
     CommittedResult result =
-        CommittedResult.create(StepTransformResult.withoutHold(transform).build(), outputs);
+        CommittedResult.create(StepTransformResult.withoutHold(transform).build(),
+            bundleFactory.createRootBundle(created).commit(Instant.now()),
+            outputs);
 
     assertThat(result.getOutputs(), Matchers.containsInAnyOrder(outputs.toArray()));
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0518fc68/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InMemoryWatermarkManagerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InMemoryWatermarkManagerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InMemoryWatermarkManagerTest.java
index 15cdf8a..b45440d 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InMemoryWatermarkManagerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InMemoryWatermarkManagerTest.java
@@ -71,6 +71,8 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
+import javax.annotation.Nullable;
+
 /**
  * Tests for {@link InMemoryWatermarkManager}.
  */
@@ -162,6 +164,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
     manager.updateWatermarks(null,
         TimerUpdate.empty(),
         result(createdInts.getProducingTransformInternal(),
+            null,
             Collections.<CommittedBundle<?>>singleton(output)),
         new Instant(8000L));
     TransformWatermarks updatedSourceWatermark =
@@ -181,6 +184,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
     manager.updateWatermarks(null,
         TimerUpdate.empty(),
         result(intsToFlatten.getProducingTransformInternal(),
+            null,
             Collections.<CommittedBundle<?>>singleton(secondPcollectionBundle)),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
 
@@ -213,6 +217,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
     manager.updateWatermarks(secondPcollectionBundle,
         TimerUpdate.empty(),
         result(flattened.getProducingTransformInternal(),
+            secondPcollectionBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>singleton(flattenedBundleSecondCreate)),
         null);
     TransformWatermarks transformAfterProcessing =
@@ -220,6 +225,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
     manager.updateWatermarks(secondPcollectionBundle,
         TimerUpdate.empty(),
         result(flattened.getProducingTransformInternal(),
+            secondPcollectionBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>singleton(flattenedBundleSecondCreate)),
         null);
     assertThat(
@@ -237,6 +243,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
     manager.updateWatermarks(null,
         TimerUpdate.empty(),
         result(createdInts.getProducingTransformInternal(),
+            null,
             Collections.<CommittedBundle<?>>singleton(firstPcollectionBundle)),
         new Instant(Long.MAX_VALUE));
     TransformWatermarks firstSourceWatermarks =
@@ -267,6 +274,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
     manager.updateWatermarks(firstPcollectionBundle,
         TimerUpdate.empty(),
         result(flattened.getProducingTransformInternal(),
+            firstPcollectionBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>singleton(completedFlattenBundle)),
         null);
     TransformWatermarks afterConsumingAllInput =
@@ -291,6 +299,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
     manager.updateWatermarks(null,
         TimerUpdate.empty(),
         result(createdInts.getProducingTransformInternal(),
+            null,
             Collections.<CommittedBundle<?>>singleton(createdBundle)),
         new Instant(Long.MAX_VALUE));
     TransformWatermarks createdAfterProducing =
@@ -306,6 +315,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
     manager.updateWatermarks(createdBundle,
         TimerUpdate.empty(),
         result(keyed.getProducingTransformInternal(),
+            createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>singleton(keyBundle)),
             null);
     TransformWatermarks keyedWatermarks =
@@ -325,6 +335,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
     manager.updateWatermarks(createdBundle,
         TimerUpdate.empty(),
         result(filtered.getProducingTransformInternal(),
+            createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>singleton(filteredBundle)),
         null);
     TransformWatermarks filteredProcessedWatermarks =
@@ -350,6 +361,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
     manager.updateWatermarks(null,
         TimerUpdate.empty(),
         result(createdInts.getProducingTransformInternal(),
+            null,
             Collections.<CommittedBundle<?>>singleton(createdBundle)),
         new Instant(Long.MAX_VALUE));
 
@@ -360,6 +372,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
     manager.updateWatermarks(createdBundle,
         TimerUpdate.empty(),
         result(keyed.getProducingTransformInternal(),
+            createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>singleton(keyBundle)),
         new Instant(500L));
     TransformWatermarks keyedWatermarks =
@@ -389,17 +402,20 @@ public class InMemoryWatermarkManagerTest implements Serializable {
     manager.updateWatermarks(null,
         TimerUpdate.empty(),
         result(createdInts.getProducingTransformInternal(),
+            null,
             ImmutableList.of(firstKeyBundle, secondKeyBundle)),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
 
     manager.updateWatermarks(firstKeyBundle,
         TimerUpdate.empty(),
         result(filtered.getProducingTransformInternal(),
+            firstKeyBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>emptyList()),
         new Instant(-1000L));
     manager.updateWatermarks(secondKeyBundle,
         TimerUpdate.empty(),
         result(filtered.getProducingTransformInternal(),
+            secondKeyBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>emptyList()),
         new Instant(1234L));
 
@@ -414,6 +430,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
     manager.updateWatermarks(fauxFirstKeyTimerBundle,
         TimerUpdate.empty(),
         result(filtered.getProducingTransformInternal(),
+            fauxFirstKeyTimerBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>emptyList()),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
 
@@ -424,6 +441,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
     manager.updateWatermarks(fauxSecondKeyTimerBundle,
         TimerUpdate.empty(),
         result(filtered.getProducingTransformInternal(),
+            fauxSecondKeyTimerBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>emptyList()),
         new Instant(5678L));
     assertThat(filteredWatermarks.getOutputWatermark(), equalTo(new Instant(5678L)));
@@ -431,6 +449,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
     manager.updateWatermarks(fauxSecondKeyTimerBundle,
         TimerUpdate.empty(),
         result(filtered.getProducingTransformInternal(),
+            fauxSecondKeyTimerBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>emptyList()),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
     assertThat(filteredWatermarks.getOutputWatermark(),
@@ -447,6 +466,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
         bundleFactory.createRootBundle(createdInts).commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
     manager.updateWatermarks(null,  TimerUpdate.empty(),
         result(createdInts.getProducingTransformInternal(),
+            null,
             Collections.<CommittedBundle<?>>singleton(firstInput)),
         new Instant(0L));
     TransformWatermarks firstWatermarks =
@@ -458,6 +478,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
     manager.updateWatermarks(null,
         TimerUpdate.empty(),
         result(createdInts.getProducingTransformInternal(),
+            null,
             Collections.<CommittedBundle<?>>singleton(secondInput)),
         new Instant(-250L));
     TransformWatermarks secondWatermarks =
@@ -473,10 +494,12 @@ public class InMemoryWatermarkManagerTest implements Serializable {
   public void updateWatermarkWithHoldsShouldBeMonotonic() {
     CommittedBundle<Integer> createdBundle = timestampedBundle(createdInts,
         TimestampedValue.of(1, new Instant(1_000_000L)),
-            TimestampedValue.of(2, new Instant(1234L)),
-            TimestampedValue.of(3, new Instant(-1000L))); manager.updateWatermarks(null,
+        TimestampedValue.of(2, new Instant(1234L)),
+        TimestampedValue.of(3, new Instant(-1000L)));
+    manager.updateWatermarks(null,
         TimerUpdate.empty(),
         result(createdInts.getProducingTransformInternal(),
+            null,
             Collections.<CommittedBundle<?>>singleton(createdBundle)),
         new Instant(Long.MAX_VALUE));
 
@@ -487,6 +510,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
     manager.updateWatermarks(createdBundle,
         TimerUpdate.empty(),
         result(keyed.getProducingTransformInternal(),
+            createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>singleton(keyBundle)),
         new Instant(500L));
     TransformWatermarks keyedWatermarks =
@@ -505,6 +529,40 @@ public class InMemoryWatermarkManagerTest implements Serializable {
     assertThat(updatedWatermarks.getOutputWatermark(), equalTo(oldOutputWatermark));
   }
 
+  @Test
+  public void updateWatermarkWithUnprocessedElements() {
+    WindowedValue<Integer> first = WindowedValue.valueInGlobalWindow(1);
+    WindowedValue<Integer> second =
+        WindowedValue.timestampedValueInGlobalWindow(2, new Instant(-1000L));
+    WindowedValue<Integer> third =
+        WindowedValue.timestampedValueInGlobalWindow(3, new Instant(1234L));
+    CommittedBundle<Integer> createdBundle = bundleFactory.createRootBundle(createdInts)
+        .add(first)
+        .add(second)
+        .add(third)
+        .commit(clock.now());
+    manager.updateWatermarks(null,
+        TimerUpdate.empty(),
+        result(createdInts.getProducingTransformInternal(),
+            null,
+            Collections.<CommittedBundle<?>>singleton(createdBundle)),
+        BoundedWindow.TIMESTAMP_MAX_VALUE);
+
+    CommittedBundle<KV<String, Integer>> keyBundle = timestampedBundle(keyed,
+        TimestampedValue.of(KV.of("MyKey", 1), BoundedWindow.TIMESTAMP_MIN_VALUE));
+    manager.updateWatermarks(createdBundle,
+        TimerUpdate.empty(),
+        result(keyed.getProducingTransformInternal(),
+            createdBundle.withElements(ImmutableList.of(second, third)),
+            Collections.<CommittedBundle<?>>singleton(keyBundle)),
+        BoundedWindow.TIMESTAMP_MAX_VALUE);
+    TransformWatermarks keyedWatermarks =
+        manager.getWatermarks(keyed.getProducingTransformInternal());
+    // the unprocessed second and third are readded to pending
+    assertThat(
+        keyedWatermarks.getInputWatermark(), not(laterThan(new Instant(-1000L))));
+  }
+
   /**
    * Demonstrates that updateWatermarks in the presence of late data is monotonic.
    */
@@ -517,6 +575,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
     manager.updateWatermarks(null,
         TimerUpdate.empty(),
         result(createdInts.getProducingTransformInternal(),
+            null,
             Collections.<CommittedBundle<?>>singleton(createdBundle)),
         sourceWatermark);
 
@@ -528,6 +587,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
     manager.updateWatermarks(createdBundle,
         TimerUpdate.empty(),
         result(keyed.getProducingTransformInternal(),
+            createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>singleton(keyBundle)),
         null);
     TransformWatermarks onTimeWatermarks =
@@ -542,6 +602,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
     manager.updateWatermarks(null,
         TimerUpdate.empty(),
         result(createdInts.getProducingTransformInternal(),
+            createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>singleton(lateDataBundle)),
         new Instant(2_000_000L));
     TransformWatermarks bufferedLateWm =
@@ -560,6 +621,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
     manager.updateWatermarks(lateDataBundle,
         TimerUpdate.empty(),
         result(keyed.getProducingTransformInternal(),
+            lateDataBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>singleton(lateKeyedBundle)),
         null);
   }
@@ -567,8 +629,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
   public void updateWatermarkWithDifferentWindowedValueInstances() {
     manager.updateWatermarks(
         null,
-        TimerUpdate.empty(),
-        result(createdInts.getProducingTransformInternal(),
+        TimerUpdate.empty(), result(createdInts.getProducingTransformInternal(), null,
         Collections.<CommittedBundle<?>>singleton(
             bundleFactory
                 .createRootBundle(createdInts)
@@ -576,12 +637,14 @@ public class InMemoryWatermarkManagerTest implements Serializable {
                 .commit(Instant.now()))),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
 
-    manager.updateWatermarks(
-        bundleFactory.createRootBundle(createdInts)
-            .add(WindowedValue.valueInGlobalWindow(1))
-            .commit(Instant.now()),
+    CommittedBundle<Integer> createdBundle = bundleFactory.createRootBundle(createdInts)
+        .add(WindowedValue.valueInGlobalWindow(1))
+        .commit(Instant.now());
+    manager.updateWatermarks(createdBundle,
         TimerUpdate.empty(),
-        result(keyed.getProducingTransformInternal(), Collections.<CommittedBundle<?>>emptyList()),
+        result(keyed.getProducingTransformInternal(),
+            createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
+            Collections.<CommittedBundle<?>>emptyList()),
         null);
     TransformWatermarks onTimeWatermarks =
         manager.getWatermarks(keyed.getProducingTransformInternal());
@@ -598,6 +661,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
     manager.updateWatermarks(null,
         TimerUpdate.empty(),
         result(createdInts.getProducingTransformInternal(),
+            null,
             Collections.<CommittedBundle<?>>singleton(emptyCreateOutput)),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
     TransformWatermarks updatedSourceWatermarks =
@@ -627,6 +691,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
     manager.updateWatermarks(null,
         TimerUpdate.empty(),
         result(createdInts.getProducingTransformInternal(),
+            null,
             Collections.<CommittedBundle<?>>singleton(firstCreateOutput)),
         new Instant(12_000L));
 
@@ -634,6 +699,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
     manager.updateWatermarks(firstCreateOutput,
         TimerUpdate.empty(),
         result(filtered.getProducingTransformInternal(),
+            firstCreateOutput.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>singleton(firstFilterOutput)),
         new Instant(10_000L));
     TransformWatermarks firstFilterWatermarks =
@@ -645,6 +711,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
     manager.updateWatermarks(null,
         TimerUpdate.empty(),
         result(createdInts.getProducingTransformInternal(),
+            null,
             Collections.<CommittedBundle<?>>singleton(emptyCreateOutput)),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
     TransformWatermarks updatedSourceWatermarks =
@@ -687,6 +754,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
     manager.updateWatermarks(null,
         TimerUpdate.empty(),
         result(createdInts.getProducingTransformInternal(),
+            null,
             Collections.<CommittedBundle<?>>singleton(createOutput)),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
     TransformWatermarks createAfterUpdate =
@@ -716,6 +784,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
     manager.updateWatermarks(createOutput,
         TimerUpdate.empty(),
         result(filtered.getProducingTransformInternal(),
+            createOutput.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>singleton(filterOutputBundle)),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
     TransformWatermarks filterAfterConsumed =
@@ -737,8 +806,10 @@ public class InMemoryWatermarkManagerTest implements Serializable {
   //  @Test
   public void getSynchronizedProcessingTimeOutputHeldToPendingTimers() {
     CommittedBundle<Integer> createdBundle = multiWindowedBundle(createdInts, 1, 2, 4, 8);
-    manager.updateWatermarks(null,  TimerUpdate.empty(),
+    manager.updateWatermarks(null,
+        TimerUpdate.empty(),
         result(createdInts.getProducingTransformInternal(),
+            null,
             Collections.<CommittedBundle<?>>singleton(createdBundle)),
         new Instant(1248L));
 
@@ -759,6 +830,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
     manager.updateWatermarks(createdBundle,
         timers,
         result(filtered.getProducingTransformInternal(),
+            createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>singleton(filteredBundle)),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
     Instant startTime = clock.now();
@@ -796,6 +868,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
         TimerUpdate.builder("key")
             .withCompletedTimers(Collections.<TimerData>singleton(pastTimer)).build(),
         result(filtered.getProducingTransformInternal(),
+            filteredTimerBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>singleton(filteredTimerResult)),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
 
@@ -809,6 +882,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
     manager.updateWatermarks(filteredTimerResult,
         TimerUpdate.empty(),
         result(filteredTimesTwo.getProducingTransformInternal(),
+            filteredTimerResult.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>emptyList()),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
     assertThat(filteredDoubledWms.getSynchronizedProcessingOutputTime(), equalTo(clock.now()));
@@ -846,6 +920,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
     manager.updateWatermarks(null,
         TimerUpdate.empty(),
         result(createdInts.getProducingTransformInternal(),
+            null,
             Collections.<CommittedBundle<?>>singleton(createOutput)),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
     TransformWatermarks createAfterUpdate =
@@ -859,6 +934,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
     manager.updateWatermarks(null,
         TimerUpdate.empty(),
         result(createdInts.getProducingTransformInternal(),
+            null,
             Collections.<CommittedBundle<?>>singleton(createSecondOutput)),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
 
@@ -871,6 +947,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
     manager.updateWatermarks(null,
         TimerUpdate.empty(),
         result(createdInts.getProducingTransformInternal(),
+            null,
             Collections.<CommittedBundle<?>>singleton(created)),
         new Instant(40_900L));
 
@@ -881,6 +958,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
     manager.updateWatermarks(created,
         TimerUpdate.builder("key").setTimer(upstreamProcessingTimer).build(),
         result(filtered.getProducingTransformInternal(),
+            created.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>singleton(filteredBundle)),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
 
@@ -901,6 +979,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
         TimerUpdate.builder("key")
             .withCompletedTimers(Collections.singleton(upstreamProcessingTimer)).build(),
         result(filtered.getProducingTransformInternal(),
+            otherCreated.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>emptyList()),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
 
@@ -914,6 +993,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
         null,
         TimerUpdate.empty(),
         result(createdInts.getProducingTransformInternal(),
+            null,
             Collections.<CommittedBundle<?>>singleton(created)),
         new Instant(29_919_235L));
 
@@ -924,6 +1004,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
         created,
         TimerUpdate.empty(),
         result(filtered.getProducingTransformInternal(),
+            created.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>singleton(filteredBundle)),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
 
@@ -946,7 +1027,9 @@ public class InMemoryWatermarkManagerTest implements Serializable {
     CommittedBundle<Integer> createdBundle = multiWindowedBundle(filtered);
     manager.updateWatermarks(null,
         TimerUpdate.empty(),
-        result(createdInts.getProducingTransformInternal(), Collections.singleton(createdBundle)),
+        result(createdInts.getProducingTransformInternal(),
+            null,
+            Collections.singleton(createdBundle)),
         new Instant(1500L));
 
     TimerData earliestTimer =
@@ -966,6 +1049,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
     manager.updateWatermarks(createdBundle,
         update,
         result(filtered.getProducingTransformInternal(),
+            createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(intsToFlatten))),
         new Instant(1000L));
 
@@ -982,6 +1066,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
     manager.updateWatermarks(null,
         TimerUpdate.empty(),
         result(createdInts.getProducingTransformInternal(),
+            null,
             Collections.<CommittedBundle<?>>emptyList()),
         new Instant(50_000L));
     Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> secondTransformFiredTimers =
@@ -1007,7 +1092,9 @@ public class InMemoryWatermarkManagerTest implements Serializable {
     CommittedBundle<Integer> createdBundle = multiWindowedBundle(filtered);
     manager.updateWatermarks(null,
         TimerUpdate.empty(),
-        result(createdInts.getProducingTransformInternal(), Collections.singleton(createdBundle)),
+        result(createdInts.getProducingTransformInternal(),
+            null,
+            Collections.singleton(createdBundle)),
         new Instant(1500L));
 
     TimerData earliestTimer =
@@ -1028,7 +1115,8 @@ public class InMemoryWatermarkManagerTest implements Serializable {
         createdBundle,
         update,
         result(filtered.getProducingTransformInternal(),
-        Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(intsToFlatten))),
+            createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
+            Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(intsToFlatten))),
         new Instant(1000L));
 
     Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> firstTransformFiredTimers =
@@ -1045,6 +1133,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
     manager.updateWatermarks(null,
         TimerUpdate.empty(),
         result(createdInts.getProducingTransformInternal(),
+            null,
             Collections.<CommittedBundle<?>>emptyList()),
         new Instant(50_000L));
     Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> secondTransformFiredTimers =
@@ -1070,7 +1159,9 @@ public class InMemoryWatermarkManagerTest implements Serializable {
     CommittedBundle<Integer> createdBundle = multiWindowedBundle(filtered);
     manager.updateWatermarks(null,
         TimerUpdate.empty(),
-        result(createdInts.getProducingTransformInternal(), Collections.singleton(createdBundle)),
+        result(createdInts.getProducingTransformInternal(),
+            null,
+            Collections.singleton(createdBundle)),
         new Instant(1500L));
 
     TimerData earliestTimer = TimerData.of(
@@ -1091,6 +1182,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
         createdBundle,
         update,
         result(filtered.getProducingTransformInternal(),
+            createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(intsToFlatten))),
         new Instant(1000L));
 
@@ -1109,6 +1201,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
     manager.updateWatermarks(null,
         TimerUpdate.empty(),
         result(createdInts.getProducingTransformInternal(),
+            null,
             Collections.<CommittedBundle<?>>emptyList()),
         new Instant(50_000L));
     Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> secondTransformFiredTimers =
@@ -1273,8 +1366,10 @@ public class InMemoryWatermarkManagerTest implements Serializable {
 
   private final CommittedResult result(
       AppliedPTransform<?, ?, ?> transform,
+      @Nullable CommittedBundle<?> unprocessedBundle,
       Iterable<? extends CommittedBundle<?>> bundles) {
-    return CommittedResult.create(StepTransformResult.withoutHold(transform)
-        .build(), bundles);
+    return CommittedResult.create(StepTransformResult.withoutHold(transform).build(),
+        unprocessedBundle,
+        bundles);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0518fc68/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
index 959e9d3..8b6053e 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
@@ -491,7 +491,16 @@ public class TransformExecutorTest {
         CommittedBundle<?> inputBundle, InProcessTransformResult result) {
       handledResult = result;
       onMethod.countDown();
-      return CommittedResult.create(result, Collections.<CommittedBundle<?>>emptyList());
+      @SuppressWarnings("rawtypes") Iterable unprocessedElements =
+          result.getUnprocessedElements() == null ?
+              Collections.emptyList() :
+              result.getUnprocessedElements();
+
+      CommittedBundle<?> unprocessedBundle =
+          inputBundle == null ? null : inputBundle.withElements(unprocessedElements);
+      return CommittedResult.create(result,
+          unprocessedBundle,
+          Collections.<CommittedBundle<?>>emptyList());
     }
 
     @Override



[2/4] incubator-beam git commit: Use AutoValue for ExecutorUpdate

Posted by ke...@apache.org.
Use AutoValue for ExecutorUpdate

Explicitly provide the collections the Bundle should be consumed by in
the update.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/59cca8dd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/59cca8dd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/59cca8dd

Branch: refs/heads/master
Commit: 59cca8ddae3d544beea9684719409efe3acbe634
Parents: e7df160
Author: Thomas Groh <tg...@google.com>
Authored: Fri May 6 10:33:32 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri May 6 10:55:53 2016 -0700

----------------------------------------------------------------------
 .../direct/ExecutorServiceParallelExecutor.java | 59 ++++++++++----------
 1 file changed, 30 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/59cca8dd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index 6f26b6b..fd4cc2c 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -30,6 +30,7 @@ import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PValue;
 
+import com.google.auto.value.AutoValue;
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Optional;
 import com.google.common.cache.CacheBuilder;
@@ -191,8 +192,9 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor {
     return keyedPValues.contains(pvalue);
   }
 
-  private void scheduleConsumers(CommittedBundle<?> bundle) {
-    for (AppliedPTransform<?, ?, ?> consumer : valueToConsumers.get(bundle.getPCollection())) {
+  private void scheduleConsumers(ExecutorUpdate update) {
+    CommittedBundle<?> bundle = update.getBundle().get();
+    for (AppliedPTransform<?, ?, ?> consumer : update.getConsumers()) {
       scheduleConsumption(consumer, bundle, defaultCompletionCallback);
     }
   }
@@ -225,7 +227,8 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor {
         CommittedBundle<?> inputBundle, InProcessTransformResult result) {
       CommittedResult committedResult = getCommittedResult(inputBundle, result);
       for (CommittedBundle<?> outputBundle : committedResult.getOutputs()) {
-        allUpdates.offer(ExecutorUpdate.fromBundle(outputBundle));
+        allUpdates.offer(ExecutorUpdate.fromBundle(outputBundle,
+            valueToConsumers.get(outputBundle.getPCollection())));
       }
       return committedResult;
     }
@@ -276,38 +279,36 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor {
    *
    * Used to signal when the executor should be shut down (due to an exception).
    */
-  private static class ExecutorUpdate {
-    private final Optional<? extends CommittedBundle<?>> bundle;
-    private final Optional<? extends Throwable> throwable;
-
-    public static ExecutorUpdate fromBundle(CommittedBundle<?> bundle) {
-      return new ExecutorUpdate(bundle, null);
+  @AutoValue
+  abstract static class ExecutorUpdate {
+    public static ExecutorUpdate fromBundle(
+        CommittedBundle<?> bundle,
+        Collection<AppliedPTransform<?, ?, ?>> consumers) {
+      return new AutoValue_ExecutorServiceParallelExecutor_ExecutorUpdate(
+          Optional.of(bundle),
+          consumers,
+          Optional.<Throwable>absent());
     }
 
     public static ExecutorUpdate fromThrowable(Throwable t) {
-      return new ExecutorUpdate(null, t);
-    }
-
-    private ExecutorUpdate(CommittedBundle<?> producedBundle, Throwable throwable) {
-      this.bundle = Optional.fromNullable(producedBundle);
-      this.throwable = Optional.fromNullable(throwable);
+      return new AutoValue_ExecutorServiceParallelExecutor_ExecutorUpdate(
+          Optional.<CommittedBundle<?>>absent(),
+          Collections.<AppliedPTransform<?, ?, ?>>emptyList(),
+          Optional.of(t));
     }
 
-    public Optional<? extends CommittedBundle<?>> getBundle() {
-      return bundle;
-    }
+    /**
+     * Returns the bundle that produced this update.
+     */
+    public abstract Optional<? extends CommittedBundle<?>> getBundle();
 
-    public Optional<? extends Throwable> getException() {
-      return throwable;
-    }
+    /**
+     * Returns the transforms to process the bundle. If nonempty, {@link #getBundle()} will return
+     * a present {@link Optional}.
+     */
+    public abstract Collection<AppliedPTransform<?, ?, ?>> getConsumers();
 
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(ExecutorUpdate.class)
-          .add("bundle", bundle)
-          .add("exception", throwable)
-          .toString();
-    }
+    public abstract Optional<? extends Throwable> getException();
   }
 
   /**
@@ -353,7 +354,7 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor {
         while (update != null) {
           LOG.debug("Executor Update: {}", update);
           if (update.getBundle().isPresent()) {
-            scheduleConsumers(update.getBundle().get());
+            scheduleConsumers(update);
           } else if (update.getException().isPresent()) {
             visibleUpdates.offer(VisibleExecutorUpdate.fromThrowable(update.getException().get()));
           }