You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/10/06 22:31:18 UTC

[2/4] incubator-beam git commit: Add RootTransformEvaluatorFactory

Add RootTransformEvaluatorFactory

Use for Root Transforms.

These transforms generate their own initial inputs, which the Evaluator
is responsible for providing back to them to generate elements from the
root PCollections.

Update ExecutorServiceParallelExecutor to schedule roots based on the
provided transforms.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7306e16b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7306e16b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7306e16b

Branch: refs/heads/master
Commit: 7306e16b7c92b16fac4491dcc07e6b45cd7fff62
Parents: 0ddba6d
Author: Thomas Groh <tg...@google.com>
Authored: Fri Sep 30 16:28:35 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Thu Oct 6 15:14:37 2016 -0700

----------------------------------------------------------------------
 .../examples/complete/game/LeaderBoardTest.java |   5 +-
 .../direct/BoundedReadEvaluatorFactory.java     | 117 ++++----
 .../runners/direct/DirectTimerInternals.java    |   2 +-
 .../beam/runners/direct/EvaluationContext.java  |   5 +
 .../direct/ExecutorServiceParallelExecutor.java |  54 ++--
 .../runners/direct/FlattenEvaluatorFactory.java |  35 ++-
 .../direct/RootTransformEvaluatorFactory.java   |  42 +++
 .../direct/TestStreamEvaluatorFactory.java      | 140 +++++-----
 .../direct/TransformEvaluatorFactory.java       |   2 +-
 .../direct/TransformEvaluatorRegistry.java      |  26 +-
 .../direct/UnboundedReadEvaluatorFactory.java   | 267 ++++++++++---------
 .../beam/runners/direct/WatermarkManager.java   |  11 +
 .../direct/BoundedReadEvaluatorFactoryTest.java | 139 +++++-----
 .../direct/FlattenEvaluatorFactoryTest.java     |  14 +-
 .../direct/TestStreamEvaluatorFactoryTest.java  | 196 ++++++--------
 .../UnboundedReadEvaluatorFactoryTest.java      | 204 +++++++-------
 16 files changed, 666 insertions(+), 593 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7306e16b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java
index 40cac36..9cba704 100644
--- a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java
+++ b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.beam.examples.complete.game;
 
-import static org.apache.beam.sdk.testing.PAssert.that;
 import static org.hamcrest.Matchers.hasItem;
 import static org.junit.Assert.assertThat;
 
@@ -107,7 +106,7 @@ public class LeaderBoardTest implements Serializable {
 
     String blueTeam = TestUser.BLUE_ONE.getTeam();
     String redTeam = TestUser.RED_ONE.getTeam();
-    that(teamScores)
+    PAssert.that(teamScores)
         .inOnTimePane(new IntervalWindow(baseTime, TEAM_WINDOW_DURATION))
         .containsInAnyOrder(KV.of(blueTeam, 12), KV.of(redTeam, 4));
 
@@ -339,7 +338,7 @@ public class LeaderBoardTest implements Serializable {
     // User scores are emitted in speculative panes in the Global Window - this matcher choice
     // ensures that panes emitted by the watermark advancing to positive infinity are not included,
     // as that will not occur outside of tests
-    that(userScores)
+    PAssert.that(userScores)
         .inEarlyGlobalWindowPanes()
         .containsInAnyOrder(KV.of(TestUser.BLUE_ONE.getUser(), 15),
             KV.of(TestUser.RED_ONE.getUser(), 7),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7306e16b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
index 2260135..4936ad9 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
@@ -17,16 +17,17 @@
  */
 package org.apache.beam.runners.direct;
 
+import com.google.auto.value.AutoValue;
 import java.io.IOException;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentMap;
+import java.util.Collection;
+import java.util.Collections;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
+import org.apache.beam.runners.direct.StepTransformResult.Builder;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
+import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.Read.Bounded;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -38,64 +39,44 @@ import org.apache.beam.sdk.values.PCollection;
  * A {@link TransformEvaluatorFactory} that produces {@link TransformEvaluator TransformEvaluators}
  * for the {@link Bounded Read.Bounded} primitive {@link PTransform}.
  */
-final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory {
-  /*
-   * An evaluator for a Source is stateful, to ensure data is not read multiple times.
-   * Evaluators are cached here to ensure that the reader is not restarted if the evaluator is
-   * retriggered.
-   */
-  private final ConcurrentMap<AppliedPTransform<?, ?, ?>, Queue<? extends BoundedReadEvaluator<?>>>
-      sourceEvaluators;
+final class BoundedReadEvaluatorFactory implements RootTransformEvaluatorFactory {
   private final EvaluationContext evaluationContext;
 
   BoundedReadEvaluatorFactory(EvaluationContext evaluationContext) {
     this.evaluationContext = evaluationContext;
-    sourceEvaluators = new ConcurrentHashMap<>();
+  }
+
+  @Override
+  public Collection<CommittedBundle<?>> getInitialInputs(AppliedPTransform<?, ?, ?> transform) {
+    return createInitialSplits((AppliedPTransform) transform);
+  }
+
+  private <OutputT> Collection<CommittedBundle<?>> createInitialSplits(
+      AppliedPTransform<?, ?, Read.Bounded<OutputT>> transform) {
+    BoundedSource<OutputT> source = transform.getTransform().getSource();
+    return Collections.<CommittedBundle<?>>singleton(
+        evaluationContext
+            .<BoundedSourceShard<OutputT>>createRootBundle()
+            .add(WindowedValue.valueInGlobalWindow(BoundedSourceShard.of(source)))
+            .commit(BoundedWindow.TIMESTAMP_MAX_VALUE));
   }
 
   @SuppressWarnings({"unchecked", "rawtypes"})
   @Override
   @Nullable
   public <InputT> TransformEvaluator<InputT> forApplication(
-      AppliedPTransform<?, ?, ?> application, @Nullable CommittedBundle<?> inputBundle)
-      throws IOException {
-    return getTransformEvaluator((AppliedPTransform) application);
+      AppliedPTransform<?, ?, ?> application, CommittedBundle<?> inputBundle) throws IOException {
+    return createEvaluator((AppliedPTransform) application);
   }
 
-  @Override
-  public void cleanup() {}
-
-  /**
-   * Get a {@link TransformEvaluator} that produces elements for the provided application of {@link
-   * Bounded Read.Bounded}, initializing the queue of evaluators if required.
-   *
-   * <p>This method is thread-safe, and will only produce new evaluators if no other invocation has
-   * already done so.
-   */
-  private <OutputT> TransformEvaluator<?> getTransformEvaluator(
+  private <OutputT> TransformEvaluator<?> createEvaluator(
       final AppliedPTransform<?, PCollection<OutputT>, ?> transform) {
-    // Key by the application and the context the evaluation is occurring in (which call to
-    // Pipeline#run).
-    Queue<BoundedReadEvaluator<OutputT>> evaluatorQueue =
-        (Queue<BoundedReadEvaluator<OutputT>>) sourceEvaluators.get(transform);
-    if (evaluatorQueue == null) {
-      evaluatorQueue = new ConcurrentLinkedQueue<>();
-      if (sourceEvaluators.putIfAbsent(transform, evaluatorQueue) == null) {
-        // If no queue existed in the evaluators, add an evaluator to initialize the evaluator
-        // factory for this transform
-        Bounded<OutputT> bound = (Bounded<OutputT>) transform.getTransform();
-        BoundedSource<OutputT> source = bound.getSource();
-        BoundedReadEvaluator<OutputT> evaluator =
-            new BoundedReadEvaluator<OutputT>(transform, evaluationContext, source);
-        evaluatorQueue.offer(evaluator);
-      } else {
-        // otherwise return the existing Queue that arrived before us
-        evaluatorQueue = (Queue<BoundedReadEvaluator<OutputT>>) sourceEvaluators.get(transform);
-      }
-    }
-    return evaluatorQueue.poll();
+    return new BoundedReadEvaluator<>(transform, evaluationContext);
   }
 
+  @Override
+  public void cleanup() {}
+
   /**
    * A {@link BoundedReadEvaluator} produces elements from an underlying {@link BoundedSource},
    * discarding all input elements. Within the call to {@link #finishBundle()}, the evaluator
@@ -105,44 +86,50 @@ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory {
    * each evaluator should only be called once per evaluation of the pipeline. Otherwise, the source
    * may produce duplicate elements.
    */
-  private static class BoundedReadEvaluator<OutputT> implements TransformEvaluator<Object> {
+  private static class BoundedReadEvaluator<OutputT>
+      implements TransformEvaluator<BoundedSourceShard<OutputT>> {
     private final AppliedPTransform<?, PCollection<OutputT>, ?> transform;
     private final EvaluationContext evaluationContext;
-    /**
-     * The source being read from by this {@link BoundedReadEvaluator}. This may not be the same as
-     * the source derived from {@link #transform} due to splitting.
-     */
-    private BoundedSource<OutputT> source;
+    private Builder resultBuilder;
 
     public BoundedReadEvaluator(
         AppliedPTransform<?, PCollection<OutputT>, ?> transform,
-        EvaluationContext evaluationContext,
-        BoundedSource<OutputT> source) {
+        EvaluationContext evaluationContext) {
       this.transform = transform;
       this.evaluationContext = evaluationContext;
-      this.source = source;
+      resultBuilder = StepTransformResult.withoutHold(transform);
     }
 
     @Override
-    public void processElement(WindowedValue<Object> element) {}
-
-    @Override
-    public TransformResult finishBundle() throws IOException {
+    public void processElement(WindowedValue<BoundedSourceShard<OutputT>> element)
+        throws IOException {
+      BoundedSource<OutputT> source = element.getValue().getSource();
       try (final BoundedReader<OutputT> reader =
           source.createReader(evaluationContext.getPipelineOptions())) {
         boolean contentsRemaining = reader.start();
-        UncommittedBundle<OutputT> output =
-            evaluationContext.createBundle(transform.getOutput());
+        UncommittedBundle<OutputT> output = evaluationContext.createBundle(transform.getOutput());
         while (contentsRemaining) {
           output.add(
               WindowedValue.timestampedValueInGlobalWindow(
                   reader.getCurrent(), reader.getCurrentTimestamp()));
           contentsRemaining = reader.advance();
         }
-        return StepTransformResult.withHold(transform, BoundedWindow.TIMESTAMP_MAX_VALUE)
-            .addOutput(output)
-            .build();
+        resultBuilder.addOutput(output);
       }
     }
+
+    @Override
+    public TransformResult finishBundle()  {
+      return resultBuilder.build();
+    }
+  }
+
+  @AutoValue
+  abstract static class BoundedSourceShard<T> {
+    static <T> BoundedSourceShard<T> of(BoundedSource<T> source) {
+      return new AutoValue_BoundedReadEvaluatorFactory_BoundedSourceShard<>(source);
+    }
+
+    abstract BoundedSource<T> getSource();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7306e16b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java
index 4003983..3158577 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java
@@ -27,7 +27,7 @@ import org.joda.time.Instant;
 /**
  * An implementation of {@link TimerInternals} where all relevant data exists in memory.
  */
-public class DirectTimerInternals implements TimerInternals {
+class DirectTimerInternals implements TimerInternals {
   private final Clock processingTimeClock;
   private final TransformWatermarks watermarks;
   private final TimerUpdateBuilder timerUpdateBuilder;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7306e16b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
index 16cf096..2901254 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
@@ -135,6 +135,11 @@ class EvaluationContext {
         WatermarkCallbackExecutor.create(MoreExecutors.directExecutor());
   }
 
+  public void initialize(
+      Map<AppliedPTransform<?, ?, ?>, ? extends Iterable<CommittedBundle<?>>> initialInputs) {
+    watermarkManager.initialize(initialInputs);
+  }
+
   /**
    * Handle the provided {@link TransformResult}, produced after evaluating the provided
    * {@link CommittedBundle} (potentially null, if the result of a root {@link PTransform}).

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7306e16b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index 53e03c8..bb89699 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -23,7 +23,6 @@ import com.google.common.base.Optional;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -33,7 +32,9 @@ import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
@@ -81,7 +82,8 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
   private final TransformExecutorService parallelExecutorService;
   private final CompletionCallback defaultCompletionCallback;
 
-  private Collection<AppliedPTransform<?, ?, ?>> rootNodes;
+  private final ConcurrentMap<AppliedPTransform<?, ?, ?>, ConcurrentLinkedQueue<CommittedBundle<?>>>
+      pendingRootBundles;
 
  private final AtomicReference<ExecutorState> state =
       new AtomicReference<>(ExecutorState.QUIESCENT);
@@ -134,6 +136,7 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
     parallelExecutorService = TransformExecutorServices.parallel(executorService);
     defaultCompletionCallback =
         new TimerIterableCompletionCallback(Collections.<TimerData>emptyList());
+    this.pendingRootBundles = new ConcurrentHashMap<>();
   }
 
   private CacheLoader<StepAndKey, TransformExecutorService>
@@ -148,7 +151,12 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
 
   @Override
   public void start(Collection<AppliedPTransform<?, ?, ?>> roots) {
-    rootNodes = ImmutableList.copyOf(roots);
+    for (AppliedPTransform<?, ?, ?> root : roots) {
+      ConcurrentLinkedQueue<CommittedBundle<?>> pending = new ConcurrentLinkedQueue<>();
+      pending.addAll(registry.getInitialInputs(root));
+      pendingRootBundles.put(root, pending);
+    }
+    evaluationContext.initialize(pendingRootBundles);
     Runnable monitorRunnable = new MonitorRunnable();
     executorService.submit(monitorRunnable);
   }
@@ -163,13 +171,12 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
 
   private <T> void evaluateBundle(
       final AppliedPTransform<?, ?, ?> transform,
-      @Nullable final CommittedBundle<T> bundle,
+      final CommittedBundle<T> bundle,
       final CompletionCallback onComplete) {
     TransformExecutorService transformExecutor;
 
-    if (bundle != null && isKeyed(bundle.getPCollection())) {
-      final StepAndKey stepAndKey =
-          StepAndKey.of(transform, bundle == null ? null : bundle.getKey());
+    if (isKeyed(bundle.getPCollection())) {
+      final StepAndKey stepAndKey = StepAndKey.of(transform, bundle.getKey());
       // This executor will remain reachable until it has executed all scheduled transforms.
       // The TransformExecutors keep a strong reference to the Executor, the ExecutorService keeps
       // a reference to the scheduled TransformExecutor callable. Follow-up TransformExecutors
@@ -247,8 +254,16 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
       }
       CommittedBundle<?> unprocessedInputs = committedResult.getUnprocessedInputs();
       if (unprocessedInputs != null && !Iterables.isEmpty(unprocessedInputs.getElements())) {
-        allUpdates.offer(ExecutorUpdate.fromBundle(unprocessedInputs,
-            Collections.<AppliedPTransform<?, ?, ?>>singleton(committedResult.getTransform())));
+        if (inputBundle.getPCollection() == null) {
+          // TODO: Split this logic out of an if statement
+          pendingRootBundles.get(result.getTransform()).offer(unprocessedInputs);
+        } else {
+          allUpdates.offer(
+              ExecutorUpdate.fromBundle(
+                  unprocessedInputs,
+                  Collections.<AppliedPTransform<?, ?, ?>>singleton(
+                      committedResult.getTransform())));
+        }
       }
       if (!committedResult.getProducedOutputTypes().isEmpty()) {
         state.set(ExecutorState.ACTIVE);
@@ -369,8 +384,8 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
         for (ExecutorUpdate update : updates) {
           LOG.debug("Executor Update: {}", update);
           if (update.getBundle().isPresent()) {
-            if (ExecutorState.ACTIVE == startingState || (ExecutorState.PROCESSING == startingState
-                && noWorkOutstanding)) {
+            if (ExecutorState.ACTIVE == startingState
+                || (ExecutorState.PROCESSING == startingState && noWorkOutstanding)) {
               scheduleConsumers(update);
             } else {
               allUpdates.offer(update);
@@ -402,7 +417,7 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
     }
 
     /**
-     * Fires any available timers. Returns true if at least one timer was fired.
+     * Fires any available timers.
      */
     private void fireTimers() throws Exception {
       try {
@@ -419,6 +434,7 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
               }
               KeyedWorkItem<?, Object> work =
                   KeyedWorkItems.timersWorkItem(keyTimers.getKey().getKey(), delivery);
+              LOG.warn("Delivering {} timers for {}", delivery.size(), keyTimers.getKey().getKey());
               @SuppressWarnings({"unchecked", "rawtypes"})
               CommittedBundle<?> bundle =
                   evaluationContext
@@ -464,9 +480,17 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
       // If any timers have fired, they will add more work; We don't need to add more
       if (state.get() == ExecutorState.QUIESCENT) {
         // All current TransformExecutors are blocked; add more work from the roots.
-        for (AppliedPTransform<?, ?, ?> root : rootNodes) {
-          if (!evaluationContext.isDone(root)) {
-            scheduleConsumption(root, null, defaultCompletionCallback);
+        for (Map.Entry<AppliedPTransform<?, ?, ?>, ConcurrentLinkedQueue<CommittedBundle<?>>>
+            pendingRootEntry : pendingRootBundles.entrySet()) {
+          Collection<CommittedBundle<?>> bundles = new ArrayList<>();
+          // Pull all available work off of the queue, then schedule it all, so this loop
+          // terminates
+          while (!pendingRootEntry.getValue().isEmpty()) {
+            CommittedBundle<?> bundle = pendingRootEntry.getValue().poll();
+            bundles.add(bundle);
+          }
+          for (CommittedBundle<?> bundle : bundles) {
+            scheduleConsumption(pendingRootEntry.getKey(), bundle, defaultCompletionCallback);
             state.set(ExecutorState.ACTIVE);
           }
         }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7306e16b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
index 4fa8854..90db040 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
@@ -17,12 +17,15 @@
  */
 package org.apache.beam.runners.direct;
 
+import java.util.Collection;
+import java.util.Collections;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.Flatten.FlattenPCollectionList;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
@@ -31,21 +34,32 @@ import org.apache.beam.sdk.values.PCollectionList;
  * The {@link DirectRunner} {@link TransformEvaluatorFactory} for the {@link Flatten}
  * {@link PTransform}.
  */
-class FlattenEvaluatorFactory implements TransformEvaluatorFactory {
+class FlattenEvaluatorFactory implements RootTransformEvaluatorFactory {
   private final EvaluationContext evaluationContext;
 
   FlattenEvaluatorFactory(EvaluationContext evaluationContext) {
     this.evaluationContext = evaluationContext;
   }
 
+  /**
+   * {@inheritDoc}.
+   *
+   * <p>Returns a single empty bundle. {@link Flatten} on no inputs produces no outputs. This bundle
+   * ensures that any {@link PTransform PTransforms} that consume from the output of the provided
+   * {@link AppliedPTransform} have watermarks updated as appropriate.
+   */
+  @Override
+  public Collection<CommittedBundle<?>> getInitialInputs(AppliedPTransform<?, ?, ?> transform) {
+    return Collections.<CommittedBundle<?>>singleton(
+        evaluationContext.createRootBundle().commit(BoundedWindow.TIMESTAMP_MAX_VALUE));
+  }
+
   @Override
   public <InputT> TransformEvaluator<InputT> forApplication(
-      AppliedPTransform<?, ?, ?> application,
-      CommittedBundle<?> inputBundle
-      ) {
+      AppliedPTransform<?, ?, ?> application, CommittedBundle<?> inputBundle) {
     @SuppressWarnings({"cast", "unchecked", "rawtypes"})
-    TransformEvaluator<InputT> evaluator = (TransformEvaluator<InputT>) createInMemoryEvaluator(
-            (AppliedPTransform) application, inputBundle);
+    TransformEvaluator<InputT> evaluator =
+        (TransformEvaluator<InputT>) createInMemoryEvaluator((AppliedPTransform) application);
     return evaluator;
   }
 
@@ -55,14 +69,7 @@ class FlattenEvaluatorFactory implements TransformEvaluatorFactory {
   private <InputT> TransformEvaluator<InputT> createInMemoryEvaluator(
       final AppliedPTransform<
               PCollectionList<InputT>, PCollection<InputT>, FlattenPCollectionList<InputT>>
-          application,
-      final CommittedBundle<InputT> inputBundle) {
-    if (inputBundle == null) {
-      // it is impossible to call processElement on a flatten with no input bundle. A Flatten with
-      // no input bundle occurs as an output of Flatten.pcollections(PCollectionList.empty())
-      return new FlattenEvaluator<>(
-          null, StepTransformResult.withoutHold(application).build());
-    }
+          application) {
     final UncommittedBundle<InputT> outputBundle =
         evaluationContext.createBundle(application.getOutput());
     final TransformResult result =

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7306e16b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootTransformEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootTransformEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootTransformEvaluatorFactory.java
new file mode 100644
index 0000000..5785dea
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootTransformEvaluatorFactory.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct;
+
+import java.util.Collection;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.PTransform;
+
+/**
+ * A {@link TransformEvaluatorFactory} for {@link PTransform PTransforms} that are at the root of a
+ * {@link Pipeline}. Provides a way to get initial inputs, which will cause the {@link PTransform}
+ * to produce all of the appropriate output.
+ */
+interface RootTransformEvaluatorFactory extends TransformEvaluatorFactory {
+  /**
+   * Get the initial inputs for the {@link AppliedPTransform}. The {@link AppliedPTransform} will be
+   * provided with these {@link CommittedBundle bundles} as input when the {@link Pipeline} runs.
+   *
+   * <p>For source transforms, these should be sufficient that, when provided to the evaluators
+   * produced by {@link #forApplication(AppliedPTransform, CommittedBundle)}, all of the elements
+   * contained in the source are eventually produced.
+   */
+  Collection<CommittedBundle<?>> getInitialInputs(AppliedPTransform<?, ?, ?> transform);
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7306e16b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
index 5a94143..8e634c8 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
@@ -20,10 +20,12 @@ package org.apache.beam.runners.direct;
 
 import static com.google.common.base.Preconditions.checkState;
 
+import com.google.auto.value.AutoValue;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Supplier;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
@@ -50,21 +52,32 @@ import org.joda.time.Duration;
 import org.joda.time.Instant;
 
 /** The {@link TransformEvaluatorFactory} for the {@link TestStream} primitive. */
-class TestStreamEvaluatorFactory implements TransformEvaluatorFactory {
-  private final KeyedResourcePool<AppliedPTransform<?, ?, ?>, Evaluator<?>> evaluators =
-      LockedKeyedResourcePool.create();
+class TestStreamEvaluatorFactory implements RootTransformEvaluatorFactory {
   private final EvaluationContext evaluationContext;
 
   TestStreamEvaluatorFactory(EvaluationContext evaluationContext) {
     this.evaluationContext = evaluationContext;
   }
 
+  @Override
+  public Collection<CommittedBundle<?>> getInitialInputs(AppliedPTransform<?, ?, ?> transform) {
+    return createInputBundle((AppliedPTransform) transform);
+  }
+
+  private <T> Collection<CommittedBundle<?>> createInputBundle(
+      AppliedPTransform<?, ?, TestStream<T>> transform) {
+    CommittedBundle<TestStreamIndex<T>> initialBundle =
+        evaluationContext
+            .<TestStreamIndex<T>>createRootBundle()
+            .add(WindowedValue.valueInGlobalWindow(TestStreamIndex.of(transform.getTransform())))
+            .commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
+    return Collections.<CommittedBundle<?>>singleton(initialBundle);
+  }
+
   @Nullable
   @Override
   public <InputT> TransformEvaluator<InputT> forApplication(
-      AppliedPTransform<?, ?, ?> application,
-      @Nullable CommittedBundle<?> inputBundle)
-      throws Exception {
+      AppliedPTransform<?, ?, ?> application, CommittedBundle<?> inputBundle) {
     return createEvaluator((AppliedPTransform) application);
   }
 
@@ -80,70 +93,64 @@ class TestStreamEvaluatorFactory implements TransformEvaluatorFactory {
    * a separate collection of events cannot be created.
    */
   private <InputT, OutputT> TransformEvaluator<? super InputT> createEvaluator(
-      AppliedPTransform<PBegin, PCollection<OutputT>, TestStream<OutputT>> application)
-      throws ExecutionException {
-    return evaluators
-        .tryAcquire(application, new CreateEvaluator<>(application, evaluationContext, evaluators))
-        .orNull();
+      AppliedPTransform<PBegin, PCollection<OutputT>, TestStream<OutputT>> application) {
+    return (TransformEvaluator<InputT>) new Evaluator<>(application, evaluationContext);
   }
 
-  private static class Evaluator<T> implements TransformEvaluator<Object> {
+  private static class Evaluator<T> implements TransformEvaluator<TestStreamIndex<T>> {
     private final AppliedPTransform<PBegin, PCollection<T>, TestStream<T>> application;
     private final EvaluationContext context;
-    private final KeyedResourcePool<AppliedPTransform<?, ?, ?>, Evaluator<?>> cache;
-    private final List<Event<T>> events;
-    private int index;
-    private Instant currentWatermark;
+    private final StepTransformResult.Builder resultBuilder;
 
     private Evaluator(
         AppliedPTransform<PBegin, PCollection<T>, TestStream<T>> application,
-        EvaluationContext context,
-        KeyedResourcePool<AppliedPTransform<?, ?, ?>, Evaluator<?>> cache) {
+        EvaluationContext context) {
       this.application = application;
       this.context = context;
-      this.cache = cache;
-      this.events = application.getTransform().getEvents();
-      index = 0;
-      currentWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
+      this.resultBuilder = StepTransformResult.withoutHold(application);
     }
 
     @Override
-    public void processElement(WindowedValue<Object> element) throws Exception {}
+    public void processElement(WindowedValue<TestStreamIndex<T>> element) throws Exception {
+      TestStreamIndex<T> streamIndex = element.getValue();
+      List<Event<T>> events = streamIndex.getTestStream().getEvents();
+      int index = streamIndex.getIndex();
+      Instant watermark = element.getTimestamp();
+      Event<T> event = events.get(index);
+
+      if (event.getType().equals(EventType.ELEMENT)) {
+        UncommittedBundle<T> bundle = context.createBundle(application.getOutput());
+        for (TimestampedValue<T> elem : ((ElementEvent<T>) event).getElements()) {
+          bundle.add(
+              WindowedValue.timestampedValueInGlobalWindow(elem.getValue(), elem.getTimestamp()));
+        }
+        resultBuilder.addOutput(bundle);
+      }
+
+      if (event.getType().equals(EventType.WATERMARK)) {
+        watermark = ((WatermarkEvent<T>) event).getWatermark();
+      }
+
+      if (event.getType().equals(EventType.PROCESSING_TIME)) {
+        ((TestClock) context.getClock())
+            .advance(((ProcessingTimeEvent<T>) event).getProcessingTimeAdvance());
+      }
+
+      TestStreamIndex<T> next = streamIndex.next();
+      if (next.getIndex() < events.size()) {
+        resultBuilder.addUnprocessedElements(
+            Collections.singleton(WindowedValue.timestampedValueInGlobalWindow(next, watermark)));
+      }
+    }
 
     @Override
     public TransformResult finishBundle() throws Exception {
-      try {
-        if (index >= events.size()) {
-          return StepTransformResult.withHold(application, BoundedWindow.TIMESTAMP_MAX_VALUE)
-              .build();
-        }
-        Event<T> event = events.get(index);
-        if (event.getType().equals(EventType.WATERMARK)) {
-          currentWatermark = ((WatermarkEvent<T>) event).getWatermark();
-        }
-        StepTransformResult.Builder result =
-            StepTransformResult.withHold(application, currentWatermark);
-        if (event.getType().equals(EventType.ELEMENT)) {
-          UncommittedBundle<T> bundle = context.createBundle(application.getOutput());
-          for (TimestampedValue<T> elem : ((ElementEvent<T>) event).getElements()) {
-            bundle.add(
-                WindowedValue.timestampedValueInGlobalWindow(elem.getValue(), elem.getTimestamp()));
-          }
-          result.addOutput(bundle);
-        }
-        if (event.getType().equals(EventType.PROCESSING_TIME)) {
-          ((TestClock) context.getClock())
-              .advance(((ProcessingTimeEvent<T>) event).getProcessingTimeAdvance());
-        }
-        index++;
-        return result.build();
-      } finally {
-        cache.release(application, this);
-      }
+      return resultBuilder.build();
     }
   }
 
-  private static class TestClock implements Clock {
+  @VisibleForTesting
+  static class TestClock implements Clock {
     private final AtomicReference<Instant> currentTime =
         new AtomicReference<>(BoundedWindow.TIMESTAMP_MIN_VALUE);
 
@@ -199,23 +206,18 @@ class TestStreamEvaluatorFactory implements TransformEvaluatorFactory {
     }
   }
 
-  private static class CreateEvaluator<OutputT> implements Callable<Evaluator<?>> {
-    private final AppliedPTransform<PBegin, PCollection<OutputT>, TestStream<OutputT>> application;
-    private final EvaluationContext evaluationContext;
-    private final KeyedResourcePool<AppliedPTransform<?, ?, ?>, Evaluator<?>> evaluators;
-
-    public CreateEvaluator(
-        AppliedPTransform<PBegin, PCollection<OutputT>, TestStream<OutputT>> application,
-        EvaluationContext evaluationContext,
-        KeyedResourcePool<AppliedPTransform<?, ?, ?>, Evaluator<?>> evaluators) {
-      this.application = application;
-      this.evaluationContext = evaluationContext;
-      this.evaluators = evaluators;
+  @AutoValue
+  abstract static class TestStreamIndex<T> {
+    static <T> TestStreamIndex<T> of(TestStream<T> stream) {
+      return new AutoValue_TestStreamEvaluatorFactory_TestStreamIndex<>(stream, 0);
     }
 
-    @Override
-    public Evaluator<?> call() throws Exception {
-      return new Evaluator<>(application, evaluationContext, evaluators);
+    abstract TestStream<T> getTestStream();
+    abstract int getIndex();
+
+    TestStreamIndex<T> next() {
+      return new AutoValue_TestStreamEvaluatorFactory_TestStreamIndex<>(
+          getTestStream(), getIndex() + 1);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7306e16b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java
index e4f3e0c..efbe137 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java
@@ -48,7 +48,7 @@ public interface TransformEvaluatorFactory {
    */
   @Nullable
   <InputT> TransformEvaluator<InputT> forApplication(
-      AppliedPTransform<?, ?, ?> application, @Nullable CommittedBundle<?> inputBundle)
+      AppliedPTransform<?, ?, ?> application, CommittedBundle<?> inputBundle)
       throws Exception;
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7306e16b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
index 08b636e..3332c2a 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.direct;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
 
 import com.google.common.collect.ImmutableMap;
@@ -24,7 +25,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
-import javax.annotation.Nullable;
 import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupAlsoByWindow;
 import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
@@ -42,7 +42,7 @@ import org.slf4j.LoggerFactory;
  * A {@link TransformEvaluatorFactory} that delegates to primitive {@link TransformEvaluatorFactory}
  * implementations based on the type of {@link PTransform} of the application.
  */
-class TransformEvaluatorRegistry implements TransformEvaluatorFactory {
+class TransformEvaluatorRegistry implements RootTransformEvaluatorFactory {
   private static final Logger LOG = LoggerFactory.getLogger(TransformEvaluatorRegistry.class);
   public static TransformEvaluatorRegistry defaultRegistry(EvaluationContext ctxt) {
     @SuppressWarnings("rawtypes")
@@ -77,15 +77,33 @@ class TransformEvaluatorRegistry implements TransformEvaluatorFactory {
   }
 
   @Override
+  public Collection<CommittedBundle<?>> getInitialInputs(AppliedPTransform<?, ?, ?> transform) {
+    checkState(
+        !finished.get(), "Tried to get initial inputs for a finished TransformEvaluatorRegistry");
+    TransformEvaluatorFactory factory = getFactory(transform);
+    checkArgument(
+        factory instanceof RootTransformEvaluatorFactory,
+        "Tried to get Initial Inputs for Transform %s. %s does not have an associated %s",
+        transform.getFullName(),
+        transform.getTransform().getClass().getSimpleName(),
+        RootTransformEvaluatorFactory.class.getSimpleName());
+    return ((RootTransformEvaluatorFactory) factory).getInitialInputs(transform);
+  }
+
+  @Override
   public <InputT> TransformEvaluator<InputT> forApplication(
-      AppliedPTransform<?, ?, ?> application, @Nullable CommittedBundle<?> inputBundle)
+      AppliedPTransform<?, ?, ?> application, CommittedBundle<?> inputBundle)
       throws Exception {
     checkState(
         !finished.get(), "Tried to get an evaluator for a finished TransformEvaluatorRegistry");
-    TransformEvaluatorFactory factory = factories.get(application.getTransform().getClass());
+    TransformEvaluatorFactory factory = getFactory(application);
     return factory.forApplication(application, inputBundle);
   }
 
+  private TransformEvaluatorFactory getFactory(AppliedPTransform<?, ?, ?> application) {
+    return factories.get(application.getTransform().getClass());
+  }
+
   @Override
   public void cleanup() throws Exception {
     Collection<Exception> thrownInCleanup = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7306e16b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
index 557c9a8..1a89695 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
@@ -17,14 +17,19 @@
  */
 package org.apache.beam.runners.direct;
 
+import com.google.auto.value.AutoValue;
 import com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadLocalRandom;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
+import org.apache.beam.runners.direct.UnboundedReadDeduplicator.NeverDeduplicator;
+import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.Read.Unbounded;
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
@@ -41,77 +46,58 @@ import org.joda.time.Instant;
  * A {@link TransformEvaluatorFactory} that produces {@link TransformEvaluator TransformEvaluators}
  * for the {@link Unbounded Read.Unbounded} primitive {@link PTransform}.
  */
-class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
-  // Resume from a checkpoint every nth invocation, to ensure close-and-resume is exercised
-  @VisibleForTesting static final int MAX_READER_REUSE_COUNT = 20;
+class UnboundedReadEvaluatorFactory implements RootTransformEvaluatorFactory {
+  // Occasionally close an existing reader and resume from checkpoint, to exercise close-and-resume
+  @VisibleForTesting static final double DEFAULT_READER_REUSE_CHANCE = 0.95;
 
-  /*
-   * An evaluator for a Source is stateful, to ensure the CheckpointMark is properly persisted.
-   * Evaluators are cached here to ensure that the checkpoint mark is appropriately reused
-   * and any splits are honored.
-   *
-   * <p>The Queue storing available evaluators must enforce a happens-before relationship for
-   * elements being added to the queue to accesses after it, to ensure that updates performed to the
-   * state of an evaluator are properly visible. ConcurrentLinkedQueue provides this relation, but
-   * an arbitrary Queue implementation does not, so the concrete type is used explicitly.
-   */
-  private final ConcurrentMap<
-          AppliedPTransform<?, ?, ?>, ConcurrentLinkedQueue<? extends UnboundedReadEvaluator<?, ?>>>
-      sourceEvaluators;
   private final EvaluationContext evaluationContext;
+  private final ConcurrentMap<AppliedPTransform<?, ?, ?>, UnboundedReadDeduplicator> deduplicators;
+  private final double readerReuseChance;
 
   UnboundedReadEvaluatorFactory(EvaluationContext evaluationContext) {
+    this(evaluationContext, DEFAULT_READER_REUSE_CHANCE);
+  }
+
+  @VisibleForTesting
+  UnboundedReadEvaluatorFactory(EvaluationContext evaluationContext, double readerReuseChance) {
     this.evaluationContext = evaluationContext;
-    sourceEvaluators = new ConcurrentHashMap<>();
+    deduplicators = new ConcurrentHashMap<>();
+    this.readerReuseChance = readerReuseChance;
+  }
+
+  @Override
+  public Collection<CommittedBundle<?>> getInitialInputs(AppliedPTransform<?, ?, ?> transform) {
+    return createInitialSplits((AppliedPTransform) transform);
+  }
+
+  private <OutputT> Collection<CommittedBundle<?>> createInitialSplits(
+      AppliedPTransform<?, ?, Read.Unbounded<OutputT>> transform) {
+    UnboundedSource<OutputT, ?> source = transform.getTransform().getSource();
+    UnboundedReadDeduplicator deduplicator =
+        source.requiresDeduping()
+            ? UnboundedReadDeduplicator.CachedIdDeduplicator.create()
+            : NeverDeduplicator.create();
+
+    UnboundedSourceShard<OutputT, ?> shard = UnboundedSourceShard.unstarted(source, deduplicator);
+    return Collections.<CommittedBundle<?>>singleton(
+        evaluationContext
+            .<UnboundedSourceShard<?, ?>>createRootBundle()
+            .add(WindowedValue.<UnboundedSourceShard<?, ?>>valueInGlobalWindow(shard))
+            .commit(BoundedWindow.TIMESTAMP_MAX_VALUE));
   }
 
   @SuppressWarnings({"unchecked", "rawtypes"})
   @Override
   @Nullable
   public <InputT> TransformEvaluator<InputT> forApplication(
-      AppliedPTransform<?, ?, ?> application, @Nullable CommittedBundle<?> inputBundle) {
-    return getTransformEvaluator((AppliedPTransform) application);
+      AppliedPTransform<?, ?, ?> application, CommittedBundle<?> inputBundle) {
+    return createEvaluator((AppliedPTransform) application);
   }
 
-  /**
-   * Get a {@link TransformEvaluator} that produces elements for the provided application of {@link
-   * Unbounded Read.Unbounded}, initializing the queue of evaluators if required.
-   *
-   * <p>This method is thread-safe, and will only produce new evaluators if no other invocation has
-   * already done so.
-   */
-  private <OutputT, CheckpointMarkT extends CheckpointMark>
-      TransformEvaluator<?> getTransformEvaluator(
-          final AppliedPTransform<?, PCollection<OutputT>, ?> transform) {
-    ConcurrentLinkedQueue<UnboundedReadEvaluator<OutputT, CheckpointMarkT>> evaluatorQueue =
-        (ConcurrentLinkedQueue<UnboundedReadEvaluator<OutputT, CheckpointMarkT>>)
-            sourceEvaluators.get(transform);
-    if (evaluatorQueue == null) {
-      evaluatorQueue = new ConcurrentLinkedQueue<>();
-      if (sourceEvaluators.putIfAbsent(transform, evaluatorQueue) == null) {
-        // If no queue existed in the evaluators, add an evaluator to initialize the evaluator
-        // factory for this transform
-        Unbounded<OutputT> unbounded = (Unbounded<OutputT>) transform.getTransform();
-        UnboundedSource<OutputT, CheckpointMarkT> source =
-            (UnboundedSource<OutputT, CheckpointMarkT>) unbounded.getSource();
-        UnboundedReadDeduplicator deduplicator;
-        if (source.requiresDeduping()) {
-          deduplicator = UnboundedReadDeduplicator.CachedIdDeduplicator.create();
-        } else {
-          deduplicator = UnboundedReadDeduplicator.NeverDeduplicator.create();
-        }
-        UnboundedReadEvaluator<OutputT, CheckpointMarkT> evaluator =
-            new UnboundedReadEvaluator<>(
-                transform, evaluationContext, source, deduplicator, evaluatorQueue);
-        evaluatorQueue.offer(evaluator);
-      } else {
-        // otherwise return the existing Queue that arrived before us
-        evaluatorQueue =
-            (ConcurrentLinkedQueue<UnboundedReadEvaluator<OutputT, CheckpointMarkT>>)
-                sourceEvaluators.get(transform);
-      }
-    }
-    return evaluatorQueue.poll();
+  private <OutputT> TransformEvaluator<?> createEvaluator(
+      AppliedPTransform<?, PCollection<OutputT>, Read.Unbounded<OutputT>> application) {
+    return new UnboundedReadEvaluator<>(
+        application, evaluationContext, readerReuseChance);
   }
 
   @Override
@@ -128,109 +114,102 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
    * #finishBundle()}.
    */
   private static class UnboundedReadEvaluator<OutputT, CheckpointMarkT extends CheckpointMark>
-      implements TransformEvaluator<Object> {
+      implements TransformEvaluator<UnboundedSourceShard<OutputT, CheckpointMarkT>> {
     private static final int ARBITRARY_MAX_ELEMENTS = 10;
 
     private final AppliedPTransform<?, PCollection<OutputT>, ?> transform;
     private final EvaluationContext evaluationContext;
-    private final ConcurrentLinkedQueue<UnboundedReadEvaluator<OutputT, CheckpointMarkT>>
-        evaluatorQueue;
-    /**
-     * The source being read from by this {@link UnboundedReadEvaluator}. This may not be the same
-     * source as derived from {@link #transform} due to splitting.
-     */
-    private final UnboundedSource<OutputT, CheckpointMarkT> source;
-
-    private final UnboundedReadDeduplicator deduplicator;
-    private UnboundedReader<OutputT> currentReader;
-    private CheckpointMarkT checkpointMark;
-
-    /**
-     * The count of bundles output from this {@link UnboundedReadEvaluator}. Used to exercise {@link
-     * UnboundedReader#close()}.
-     */
-    private int outputBundles = 0;
+    private final double readerReuseChance;
+    private final StepTransformResult.Builder resultBuilder;
 
     public UnboundedReadEvaluator(
         AppliedPTransform<?, PCollection<OutputT>, ?> transform,
         EvaluationContext evaluationContext,
-        UnboundedSource<OutputT, CheckpointMarkT> source,
-        UnboundedReadDeduplicator deduplicator,
-        ConcurrentLinkedQueue<UnboundedReadEvaluator<OutputT, CheckpointMarkT>> evaluatorQueue) {
+        double readerReuseChance) {
       this.transform = transform;
       this.evaluationContext = evaluationContext;
-      this.evaluatorQueue = evaluatorQueue;
-      this.source = source;
-      this.currentReader = null;
-      this.deduplicator = deduplicator;
-      this.checkpointMark = null;
+      this.readerReuseChance = readerReuseChance;
+      resultBuilder = StepTransformResult.withoutHold(transform);
     }
 
     @Override
-    public void processElement(WindowedValue<Object> element) {}
-
-    @Override
-    public TransformResult finishBundle() throws IOException {
+    public void processElement(
+        WindowedValue<UnboundedSourceShard<OutputT, CheckpointMarkT>> element) throws IOException {
       UncommittedBundle<OutputT> output = evaluationContext.createBundle(transform.getOutput());
+      UnboundedSourceShard<OutputT, CheckpointMarkT> shard = element.getValue();
+      UnboundedReader<OutputT> reader = null;
       try {
-        boolean elementAvailable = startReader();
+        reader = getReader(shard);
+        boolean elementAvailable = startReader(reader, shard);
 
-        Instant watermark = currentReader.getWatermark();
         if (elementAvailable) {
+          UnboundedReadDeduplicator deduplicator = shard.getDeduplicator();
           int numElements = 0;
           do {
-            if (deduplicator.shouldOutput(currentReader.getCurrentRecordId())) {
-              output.add(
-                  WindowedValue.timestampedValueInGlobalWindow(
-                      currentReader.getCurrent(), currentReader.getCurrentTimestamp()));
+            if (deduplicator.shouldOutput(reader.getCurrentRecordId())) {
+              output.add(WindowedValue.timestampedValueInGlobalWindow(reader.getCurrent(),
+                  reader.getCurrentTimestamp()));
             }
             numElements++;
-          } while (numElements < ARBITRARY_MAX_ELEMENTS && currentReader.advance());
-          watermark = currentReader.getWatermark();
-          // Only take a checkpoint if we did any work
-          finishRead();
+          } while (numElements < ARBITRARY_MAX_ELEMENTS && reader.advance());
+          Instant watermark = reader.getWatermark();
+          UnboundedSourceShard<OutputT, CheckpointMarkT> residual = finishRead(reader, shard);
+          resultBuilder
+              .addOutput(output)
+              .addUnprocessedElements(
+                  Collections.singleton(
+                      WindowedValue.timestampedValueInGlobalWindow(residual, watermark)));
         }
-        // TODO: When exercising create initial splits, make this the minimum watermark across all
-        // existing readers
-        StepTransformResult result =
-            StepTransformResult.withHold(transform, watermark).addOutput(output).build();
-        evaluatorQueue.offer(this);
-        return result;
       } catch (IOException e) {
-        closeReader();
+        if (reader != null) {
+          reader.close();
+        }
         throw e;
       }
     }
 
-    private boolean startReader() throws IOException {
-      if (currentReader == null) {
-        if (checkpointMark != null) {
-          checkpointMark.finalizeCheckpoint();
+    private UnboundedReader<OutputT> getReader(UnboundedSourceShard<OutputT, CheckpointMarkT> shard)
+        throws IOException {
+      UnboundedReader<OutputT> existing = shard.getExistingReader();
+      if (existing == null) {
+        return shard
+            .getSource()
+            .createReader(evaluationContext.getPipelineOptions(), shard.getCheckpoint());
+      } else {
+        return existing;
+      }
+    }
+
+    private boolean startReader(
+        UnboundedReader<OutputT> reader, UnboundedSourceShard<OutputT, CheckpointMarkT> shard)
+        throws IOException {
+      if (shard.getExistingReader() == null) {
+        if (shard.getCheckpoint() != null) {
+          shard.getCheckpoint().finalizeCheckpoint();
         }
-        currentReader = source.createReader(evaluationContext.getPipelineOptions(), checkpointMark);
-        checkpointMark = null;
-        return currentReader.start();
+        return reader.start();
       } else {
-        return currentReader.advance();
+        return shard.getExistingReader().advance();
       }
     }
 
     /**
-     * Checkpoint the current reader, finalize the previous checkpoint, and update the state of this
-     * evaluator.
+     * Checkpoint the current reader, finalize the previous checkpoint, and return the residual
+     * {@link UnboundedSourceShard}.
      */
-    private void finishRead() throws IOException {
-      final CheckpointMark oldMark = checkpointMark;
+    private UnboundedSourceShard<OutputT, CheckpointMarkT> finishRead(
+        UnboundedReader<OutputT> reader, UnboundedSourceShard<OutputT, CheckpointMarkT> shard)
+        throws IOException {
+      final CheckpointMark oldMark = shard.getCheckpoint();
       @SuppressWarnings("unchecked")
-      final CheckpointMarkT mark = (CheckpointMarkT) currentReader.getCheckpointMark();
-      checkpointMark = mark;
+      final CheckpointMarkT mark = (CheckpointMarkT) reader.getCheckpointMark();
       if (oldMark != null) {
         oldMark.finalizeCheckpoint();
       }
 
       // If the watermark is the max value, this source may not be invoked again. Finalize after
       // committing the output.
-      if (!currentReader.getWatermark().isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
+      if (!reader.getWatermark().isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
         evaluationContext.scheduleAfterOutputWouldBeProduced(
             transform.getOutput(),
             GlobalWindow.INSTANCE,
@@ -247,21 +226,47 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
               }
             });
       }
-      // Sometimes resume from a checkpoint even if it's not required
 
-      if (outputBundles >= MAX_READER_REUSE_COUNT) {
-        closeReader();
-        outputBundles = 0;
+      // Sometimes resume from a checkpoint even if it's not required
+      if (ThreadLocalRandom.current().nextDouble(1.0) >= readerReuseChance) {
+        reader.close();
+        return UnboundedSourceShard.of(shard.getSource(), shard.getDeduplicator(), null, mark);
       } else {
-        outputBundles++;
+        return shard.withCheckpoint(mark);
       }
     }
 
-    private void closeReader() throws IOException {
-      if (currentReader != null) {
-        currentReader.close();
-        currentReader = null;
-      }
+    @Override
+    public TransformResult finishBundle() throws IOException {
+      return resultBuilder.build();
+    }
+  }
+
+  @AutoValue
+  abstract static class UnboundedSourceShard<T, CheckpointT extends CheckpointMark> {
+    static <T, CheckpointT extends CheckpointMark> UnboundedSourceShard<T, CheckpointT> unstarted(
+        UnboundedSource<T, CheckpointT> source, UnboundedReadDeduplicator deduplicator) {
+      return of(source, deduplicator, null, null);
+    }
+
+    static <T, CheckpointT extends CheckpointMark> UnboundedSourceShard<T, CheckpointT> of(
+        UnboundedSource<T, CheckpointT> source,
+        UnboundedReadDeduplicator deduplicator,
+        @Nullable UnboundedReader<T> reader,
+        @Nullable CheckpointT checkpoint) {
+      return new AutoValue_UnboundedReadEvaluatorFactory_UnboundedSourceShard<>(
+          source, deduplicator, reader, checkpoint);
+    }
+
+    abstract UnboundedSource<T, CheckpointT> getSource();
+    abstract UnboundedReadDeduplicator getDeduplicator();
+    @Nullable
+    abstract UnboundedReader<T> getExistingReader();
+    @Nullable
+    abstract CheckpointT getCheckpoint();
+
+    UnboundedSourceShard<T, CheckpointT> withCheckpoint(CheckpointT newCheckpoint) {
+      return of(getSource(), getDeduplicator(), getExistingReader(), newCheckpoint);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7306e16b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
index ff7428d..b3d1fc5 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
@@ -800,6 +800,17 @@ public class WatermarkManager {
     return transformToWatermarks.get(transform);
   }
 
+  public void initialize(
+      Map<AppliedPTransform<?, ?, ?>, ? extends Iterable<CommittedBundle<?>>> initialBundles) {
+    for (Map.Entry<AppliedPTransform<?, ?, ?>, ? extends Iterable<CommittedBundle<?>>> rootEntry :
+        initialBundles.entrySet()) {
+      TransformWatermarks rootWms = transformToWatermarks.get(rootEntry.getKey());
+      for (CommittedBundle<?> initialBundle : rootEntry.getValue()) {
+        rootWms.addPending(initialBundle);
+      }
+    }
+  }
+
   /**
    * Updates the watermarks of a transform with one or more inputs.
    *

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7306e16b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
index e725dd3..8544128 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
@@ -21,15 +21,18 @@ import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.emptyIterable;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.when;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
 import java.util.NoSuchElementException;
+import org.apache.beam.runners.direct.BoundedReadEvaluatorFactory.BoundedSourceShard;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.sdk.coders.BigEndianLongCoder;
@@ -38,13 +41,14 @@ import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
 import org.apache.beam.sdk.io.CountingSource;
 import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.io.Read.Bounded;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
+import org.hamcrest.Matchers;
 import org.joda.time.Instant;
 import org.junit.Before;
 import org.junit.Test;
@@ -60,7 +64,7 @@ import org.mockito.MockitoAnnotations;
 public class BoundedReadEvaluatorFactoryTest {
   private BoundedSource<Long> source;
   private PCollection<Long> longs;
-  private TransformEvaluatorFactory factory;
+  private BoundedReadEvaluatorFactory factory;
   @Mock private EvaluationContext context;
   private BundleFactory bundleFactory;
 
@@ -77,78 +81,71 @@ public class BoundedReadEvaluatorFactoryTest {
 
   @Test
   public void boundedSourceInMemoryTransformEvaluatorProducesElements() throws Exception {
-    UncommittedBundle<Long> output = bundleFactory.createBundle(longs);
-    when(context.createBundle(longs)).thenReturn(output);
-
-    TransformEvaluator<?> evaluator =
-        factory.forApplication(longs.getProducingTransformInternal(), null);
-    TransformResult result = evaluator.finishBundle();
-    assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
-    assertThat(
-        output.commit(BoundedWindow.TIMESTAMP_MAX_VALUE).getElements(),
-        containsInAnyOrder(
-            gw(1L), gw(2L), gw(4L), gw(8L), gw(9L), gw(7L), gw(6L), gw(5L), gw(3L), gw(0L)));
-  }
-
-  /**
-   * Demonstrate that acquiring multiple {@link TransformEvaluator TransformEvaluators} for the same
-   * {@link Bounded Read.Bounded} application with the same evaluation context only produces the
-   * elements once.
-   */
-  @Test
-  public void boundedSourceInMemoryTransformEvaluatorAfterFinishIsEmpty() throws Exception {
-    UncommittedBundle<Long> output = bundleFactory.createBundle(longs);
-    when(context.createBundle(longs)).thenReturn(output);
+    when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle());
+    UncommittedBundle<Long> outputBundle = bundleFactory.createBundle(longs);
+    when(context.createBundle(longs)).thenReturn(outputBundle);
+
+    Collection<CommittedBundle<?>> initialInputs =
+        factory.getInitialInputs(longs.getProducingTransformInternal());
+    List<WindowedValue<?>> outputs = new ArrayList<>();
+    for (CommittedBundle<?> shardBundle : initialInputs) {
+      TransformEvaluator<?> evaluator =
+          factory.forApplication(longs.getProducingTransformInternal(), null);
+      for (WindowedValue<?> shard : shardBundle.getElements()) {
+        evaluator.processElement((WindowedValue) shard);
+      }
+      TransformResult result = evaluator.finishBundle();
+      assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
+      assertThat(
+          Iterables.size(result.getOutputBundles()),
+          equalTo(Iterables.size(shardBundle.getElements())));
+      for (UncommittedBundle<?> output : result.getOutputBundles()) {
+        CommittedBundle<?> committed = output.commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
+        for (WindowedValue<?> val : committed.getElements()) {
+          outputs.add(val);
+        }
+      }
+    }
 
-    TransformEvaluator<?> evaluator =
-        factory.forApplication(longs.getProducingTransformInternal(), null);
-    TransformResult result = evaluator.finishBundle();
-    assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
-    Iterable<? extends WindowedValue<Long>> outputElements =
-        output.commit(BoundedWindow.TIMESTAMP_MAX_VALUE).getElements();
     assertThat(
-        outputElements,
-        containsInAnyOrder(
+        outputs,
+        Matchers.<WindowedValue<?>>containsInAnyOrder(
             gw(1L), gw(2L), gw(4L), gw(8L), gw(9L), gw(7L), gw(6L), gw(5L), gw(3L), gw(0L)));
-
-    UncommittedBundle<Long> secondOutput = bundleFactory.createBundle(longs);
-    when(context.createBundle(longs)).thenReturn(secondOutput);
-    TransformEvaluator<?> secondEvaluator =
-        factory.forApplication(longs.getProducingTransformInternal(), null);
-    assertThat(secondEvaluator, nullValue());
   }
 
-  /**
-   * Demonstrates that acquiring multiple evaluators from the factory are independent, but
-   * the elements in the source are only produced once.
-   */
   @Test
-  public void boundedSourceEvaluatorSimultaneousEvaluations() throws Exception {
-    UncommittedBundle<Long> output = bundleFactory.createBundle(longs);
-    UncommittedBundle<Long> secondOutput = bundleFactory.createBundle(longs);
-    when(context.createBundle(longs)).thenReturn(output).thenReturn(secondOutput);
-
-    // create both evaluators before finishing either.
-    TransformEvaluator<?> evaluator =
-        factory.forApplication(longs.getProducingTransformInternal(), null);
-    TransformEvaluator<?> secondEvaluator =
-        factory.forApplication(longs.getProducingTransformInternal(), null);
-    assertThat(secondEvaluator, nullValue());
-
+  public void boundedSourceInMemoryTransformEvaluatorShardsOfSource() throws Exception {
+    PipelineOptions options = PipelineOptionsFactory.create();
+    List<? extends BoundedSource<Long>> splits =
+        source.splitIntoBundles(source.getEstimatedSizeBytes(options) / 2, options);
+
+    UncommittedBundle<BoundedSourceShard<Long>> rootBundle = bundleFactory.createRootBundle();
+    for (BoundedSource<Long> split : splits) {
+      BoundedSourceShard<Long> shard = BoundedSourceShard.of(split);
+      rootBundle.add(WindowedValue.valueInGlobalWindow(shard));
+    }
+    CommittedBundle<BoundedSourceShard<Long>> shards = rootBundle.commit(Instant.now());
+
+    TransformEvaluator<BoundedSourceShard<Long>> evaluator =
+        factory.forApplication(longs.getProducingTransformInternal(), shards);
+    for (WindowedValue<BoundedSourceShard<Long>> shard : shards.getElements()) {
+      UncommittedBundle<Long> outputBundle = bundleFactory.createBundle(longs);
+      when(context.createBundle(longs)).thenReturn(outputBundle);
+      evaluator.processElement(shard);
+    }
     TransformResult result = evaluator.finishBundle();
-    assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
-    Iterable<? extends WindowedValue<Long>> outputElements =
-        output.commit(BoundedWindow.TIMESTAMP_MAX_VALUE).getElements();
+    assertThat(Iterables.size(result.getOutputBundles()), equalTo(splits.size()));
 
+    List<WindowedValue<?>> outputElems = new ArrayList<>();
+    for (UncommittedBundle<?> outputBundle : result.getOutputBundles()) {
+      CommittedBundle<?> outputs = outputBundle.commit(Instant.now());
+      for (WindowedValue<?> outputElem : outputs.getElements()) {
+        outputElems.add(outputElem);
+      }
+    }
     assertThat(
-        outputElements,
-        containsInAnyOrder(
-            gw(1L), gw(2L), gw(4L), gw(8L), gw(9L), gw(7L), gw(6L), gw(5L), gw(3L), gw(0L)));
-    assertThat(
-        secondOutput.commit(BoundedWindow.TIMESTAMP_MAX_VALUE).getElements(), emptyIterable());
-    assertThat(
-        outputElements,
-        containsInAnyOrder(
+        outputElems,
+        Matchers.<WindowedValue<?>>containsInAnyOrder(
             gw(1L), gw(2L), gw(4L), gw(8L), gw(9L), gw(7L), gw(6L), gw(5L), gw(3L), gw(0L)));
   }
 
@@ -163,7 +160,10 @@ public class BoundedReadEvaluatorFactoryTest {
     UncommittedBundle<Long> output = bundleFactory.createBundle(pcollection);
     when(context.createBundle(pcollection)).thenReturn(output);
 
-    TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, null);
+    TransformEvaluator<BoundedSourceShard<Long>> evaluator =
+        factory.forApplication(
+            sourceTransform, bundleFactory.createRootBundle().commit(Instant.now()));
+    evaluator.processElement(WindowedValue.valueInGlobalWindow(BoundedSourceShard.of(source)));
     evaluator.finishBundle();
     CommittedBundle<Long> committed = output.commit(Instant.now());
     assertThat(committed.getElements(), containsInAnyOrder(gw(2L), gw(3L), gw(1L)));
@@ -181,7 +181,10 @@ public class BoundedReadEvaluatorFactoryTest {
     UncommittedBundle<Long> output = bundleFactory.createBundle(pcollection);
     when(context.createBundle(pcollection)).thenReturn(output);
 
-    TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, null);
+    TransformEvaluator<BoundedSourceShard<Long>> evaluator =
+        factory.forApplication(
+            sourceTransform, bundleFactory.createRootBundle().commit(Instant.now()));
+    evaluator.processElement(WindowedValue.valueInGlobalWindow(BoundedSourceShard.of(source)));
     evaluator.finishBundle();
     CommittedBundle<Long> committed = output.commit(Instant.now());
     assertThat(committed.getElements(), emptyIterable());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7306e16b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java
index 435fc94..86d98e9 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java
@@ -23,6 +23,8 @@ import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import com.google.common.collect.Iterables;
+import java.util.Collection;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -120,14 +122,22 @@ public class FlattenEvaluatorFactoryTest {
     PCollection<Integer> flattened = list.apply(Flatten.<Integer>pCollections());
 
     EvaluationContext evaluationContext = mock(EvaluationContext.class);
+    when(evaluationContext.createRootBundle()).thenReturn(bundleFactory.createRootBundle());
+    when(evaluationContext.createBundle(flattened))
+        .thenReturn(bundleFactory.createBundle(flattened));
 
     FlattenEvaluatorFactory factory = new FlattenEvaluatorFactory(evaluationContext);
+    Collection<CommittedBundle<?>> initialInputs =
+        factory.getInitialInputs(flattened.getProducingTransformInternal());
     TransformEvaluator<Integer> emptyEvaluator =
-        factory.forApplication(flattened.getProducingTransformInternal(), null);
+        factory.forApplication(
+            flattened.getProducingTransformInternal(), Iterables.getOnlyElement(initialInputs));
 
     TransformResult leftSideResult = emptyEvaluator.finishBundle();
 
-    assertThat(leftSideResult.getOutputBundles(), emptyIterable());
+    CommittedBundle<?> outputBundle =
+        Iterables.getOnlyElement(leftSideResult.getOutputBundles()).commit(Instant.now());
+    assertThat(outputBundle.getElements(), emptyIterable());
     assertThat(
         leftSideResult.getTransform(),
         Matchers.<AppliedPTransform<?, ?, ?>>equalTo(flattened.getProducingTransformInternal()));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7306e16b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java
index a8cd8d7..1790b2d 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java
@@ -19,21 +19,25 @@
 package org.apache.beam.runners.direct;
 
 import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import com.google.common.collect.Iterables;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
+import java.util.Collection;
+import java.util.Collections;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.runners.direct.TestStreamEvaluatorFactory.TestClock;
+import org.apache.beam.runners.direct.TestStreamEvaluatorFactory.TestStreamIndex;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.TestStream;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
 import org.hamcrest.Matchers;
+import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.Before;
 import org.junit.Test;
@@ -62,24 +66,85 @@ public class TestStreamEvaluatorFactoryTest {
         p.apply(
             TestStream.create(VarIntCoder.of())
                 .addElements(1, 2, 3)
-                .addElements(4, 5, 6)
+                .advanceWatermarkTo(new Instant(0))
+                .addElements(
+                    TimestampedValue.atMinimumTimestamp(4),
+                    TimestampedValue.atMinimumTimestamp(5),
+                    TimestampedValue.atMinimumTimestamp(6))
+                .advanceProcessingTime(Duration.standardMinutes(10))
                 .advanceWatermarkToInfinity());
 
+    TestClock clock = new TestClock();
+    when(context.getClock()).thenReturn(clock);
+    when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle());
     when(context.createBundle(streamVals))
         .thenReturn(bundleFactory.createBundle(streamVals), bundleFactory.createBundle(streamVals));
 
-    TransformEvaluator<Object> firstEvaluator =
-        factory.forApplication(streamVals.getProducingTransformInternal(), null);
+    Collection<CommittedBundle<?>> initialInputs =
+        factory.getInitialInputs(streamVals.getProducingTransformInternal());
+    @SuppressWarnings("unchecked")
+    CommittedBundle<TestStreamIndex<Integer>> initialBundle =
+        (CommittedBundle<TestStreamIndex<Integer>>) Iterables.getOnlyElement(initialInputs);
+
+    TransformEvaluator<TestStreamIndex<Integer>> firstEvaluator =
+        factory.forApplication(streamVals.getProducingTransformInternal(), initialBundle);
+    firstEvaluator.processElement(Iterables.getOnlyElement(initialBundle.getElements()));
     TransformResult firstResult = firstEvaluator.finishBundle();
 
-    TransformEvaluator<Object> secondEvaluator =
-        factory.forApplication(streamVals.getProducingTransformInternal(), null);
+    WindowedValue<TestStreamIndex<Integer>> firstResidual =
+        (WindowedValue<TestStreamIndex<Integer>>)
+            Iterables.getOnlyElement(firstResult.getUnprocessedElements());
+    assertThat(firstResidual.getValue().getIndex(), equalTo(1));
+    assertThat(firstResidual.getTimestamp(), equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE));
+
+    CommittedBundle<TestStreamIndex<Integer>> secondBundle =
+        initialBundle.withElements(Collections.singleton(firstResidual));
+    TransformEvaluator<TestStreamIndex<Integer>> secondEvaluator =
+        factory.forApplication(streamVals.getProducingTransformInternal(), secondBundle);
+    secondEvaluator.processElement(firstResidual);
     TransformResult secondResult = secondEvaluator.finishBundle();
 
-    TransformEvaluator<Object> thirdEvaluator =
-        factory.forApplication(streamVals.getProducingTransformInternal(), null);
+    WindowedValue<TestStreamIndex<Integer>> secondResidual =
+        (WindowedValue<TestStreamIndex<Integer>>)
+            Iterables.getOnlyElement(secondResult.getUnprocessedElements());
+    assertThat(secondResidual.getValue().getIndex(), equalTo(2));
+    assertThat(secondResidual.getTimestamp(), equalTo(new Instant(0)));
+
+    CommittedBundle<TestStreamIndex<Integer>> thirdBundle =
+        secondBundle.withElements(Collections.singleton(secondResidual));
+    TransformEvaluator<TestStreamIndex<Integer>> thirdEvaluator =
+        factory.forApplication(streamVals.getProducingTransformInternal(), thirdBundle);
+    thirdEvaluator.processElement(secondResidual);
     TransformResult thirdResult = thirdEvaluator.finishBundle();
 
+    WindowedValue<TestStreamIndex<Integer>> thirdResidual =
+        (WindowedValue<TestStreamIndex<Integer>>)
+            Iterables.getOnlyElement(thirdResult.getUnprocessedElements());
+    assertThat(thirdResidual.getValue().getIndex(), equalTo(3));
+    assertThat(thirdResidual.getTimestamp(), equalTo(new Instant(0)));
+
+    Instant start = clock.now();
+    CommittedBundle<TestStreamIndex<Integer>> fourthBundle =
+        thirdBundle.withElements(Collections.singleton(thirdResidual));
+    TransformEvaluator<TestStreamIndex<Integer>> fourthEvaluator =
+        factory.forApplication(streamVals.getProducingTransformInternal(), fourthBundle);
+    fourthEvaluator.processElement(thirdResidual);
+    TransformResult fourthResult = fourthEvaluator.finishBundle();
+
+    assertThat(clock.now(), equalTo(start.plus(Duration.standardMinutes(10))));
+    WindowedValue<TestStreamIndex<Integer>> fourthResidual =
+        (WindowedValue<TestStreamIndex<Integer>>)
+            Iterables.getOnlyElement(fourthResult.getUnprocessedElements());
+    assertThat(fourthResidual.getValue().getIndex(), equalTo(4));
+    assertThat(fourthResidual.getTimestamp(), equalTo(new Instant(0)));
+
+    CommittedBundle<TestStreamIndex<Integer>> fifthBundle =
+        thirdBundle.withElements(Collections.singleton(fourthResidual));
+    TransformEvaluator<TestStreamIndex<Integer>> fifthEvaluator =
+        factory.forApplication(streamVals.getProducingTransformInternal(), fifthBundle);
+    fifthEvaluator.processElement(fourthResidual);
+    TransformResult fifthResult = fifthEvaluator.finishBundle();
+
     assertThat(
         Iterables.getOnlyElement(firstResult.getOutputBundles())
             .commit(Instant.now())
@@ -88,121 +153,18 @@ public class TestStreamEvaluatorFactoryTest {
             WindowedValue.valueInGlobalWindow(1),
             WindowedValue.valueInGlobalWindow(2),
             WindowedValue.valueInGlobalWindow(3)));
-    assertThat(firstResult.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE));
 
     assertThat(
-        Iterables.getOnlyElement(secondResult.getOutputBundles())
+        Iterables.getOnlyElement(thirdResult.getOutputBundles())
             .commit(Instant.now())
             .getElements(),
         Matchers.<WindowedValue<?>>containsInAnyOrder(
             WindowedValue.valueInGlobalWindow(4),
             WindowedValue.valueInGlobalWindow(5),
             WindowedValue.valueInGlobalWindow(6)));
-    assertThat(secondResult.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE));
-
-    assertThat(Iterables.isEmpty(thirdResult.getOutputBundles()), is(true));
-    assertThat(thirdResult.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
-  }
-
-  /** Demonstrates that at most one evaluator for an application is available at a time. */
-  @Test
-  public void onlyOneEvaluatorAtATime() throws Exception {
-    TestPipeline p = TestPipeline.create();
-    PCollection<Integer> streamVals =
-        p.apply(
-            TestStream.create(VarIntCoder.of()).addElements(4, 5, 6).advanceWatermarkToInfinity());
-
-    TransformEvaluator<Object> firstEvaluator =
-        factory.forApplication(streamVals.getProducingTransformInternal(), null);
-
-    // create a second evaluator before the first is finished. The evaluator should not be available
-    TransformEvaluator<Object> secondEvaluator =
-        factory.forApplication(streamVals.getProducingTransformInternal(), null);
-    assertThat(secondEvaluator, is(nullValue()));
-  }
-
-  /**
-   * Demonstrates that multiple applications of the same {@link TestStream} produce separate
-   * evaluators.
-   */
-  @Test
-  public void multipleApplicationsMultipleEvaluators() throws Exception {
-    TestPipeline p = TestPipeline.create();
-    TestStream<Integer> stream =
-        TestStream.create(VarIntCoder.of()).addElements(2).advanceWatermarkToInfinity();
-    PCollection<Integer> firstVals = p.apply("Stream One", stream);
-    PCollection<Integer> secondVals = p.apply("Stream A", stream);
-
-    when(context.createBundle(firstVals)).thenReturn(bundleFactory.createBundle(firstVals));
-    when(context.createBundle(secondVals)).thenReturn(bundleFactory.createBundle(secondVals));
-
-    TransformEvaluator<Object> firstEvaluator =
-        factory.forApplication(firstVals.getProducingTransformInternal(), null);
-    // The two evaluators can exist independently
-    TransformEvaluator<Object> secondEvaluator =
-        factory.forApplication(secondVals.getProducingTransformInternal(), null);
-
-    TransformResult firstResult = firstEvaluator.finishBundle();
-    TransformResult secondResult = secondEvaluator.finishBundle();
-
-    assertThat(
-        Iterables.getOnlyElement(firstResult.getOutputBundles())
-            .commit(Instant.now())
-            .getElements(),
-        Matchers.<WindowedValue<?>>containsInAnyOrder(WindowedValue.valueInGlobalWindow(2)));
-    assertThat(firstResult.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE));
 
-    // They both produce equal results, and don't interfere with each other
-    assertThat(
-        Iterables.getOnlyElement(secondResult.getOutputBundles())
-            .commit(Instant.now())
-            .getElements(),
-        Matchers.<WindowedValue<?>>containsInAnyOrder(WindowedValue.valueInGlobalWindow(2)));
-    assertThat(secondResult.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE));
-  }
-
-  /**
-   * Demonstrates that multiple applications of different {@link TestStream} produce independent
-   * evaluators.
-   */
-  @Test
-  public void multipleStreamsMultipleEvaluators() throws Exception {
-    TestPipeline p = TestPipeline.create();
-    PCollection<Integer> firstVals =
-        p.apply(
-            "Stream One",
-            TestStream.create(VarIntCoder.of()).addElements(2).advanceWatermarkToInfinity());
-    PCollection<String> secondVals =
-        p.apply(
-            "Stream A",
-            TestStream.create(StringUtf8Coder.of())
-                .addElements("Two")
-                .advanceWatermarkToInfinity());
-
-    when(context.createBundle(firstVals)).thenReturn(bundleFactory.createBundle(firstVals));
-    when(context.createBundle(secondVals)).thenReturn(bundleFactory.createBundle(secondVals));
-
-    TransformEvaluator<Object> firstEvaluator =
-        factory.forApplication(firstVals.getProducingTransformInternal(), null);
-    // The two evaluators can exist independently
-    TransformEvaluator<Object> secondEvaluator =
-        factory.forApplication(secondVals.getProducingTransformInternal(), null);
-
-    TransformResult firstResult = firstEvaluator.finishBundle();
-    TransformResult secondResult = secondEvaluator.finishBundle();
-
-    assertThat(
-        Iterables.getOnlyElement(firstResult.getOutputBundles())
-            .commit(Instant.now())
-            .getElements(),
-        Matchers.<WindowedValue<?>>containsInAnyOrder(WindowedValue.valueInGlobalWindow(2)));
-    assertThat(firstResult.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE));
-
-    assertThat(
-        Iterables.getOnlyElement(secondResult.getOutputBundles())
-            .commit(Instant.now())
-            .getElements(),
-        Matchers.<WindowedValue<?>>containsInAnyOrder(WindowedValue.valueInGlobalWindow("Two")));
-    assertThat(secondResult.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE));
+    assertThat(fifthResult.getOutputBundles(), Matchers.emptyIterable());
+    assertThat(fifthResult.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
+    assertThat(fifthResult.getUnprocessedElements(), Matchers.emptyIterable());
   }
 }