You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/05/03 21:12:19 UTC

[3/3] beam git commit: Reduce visibility of DirectRunner classes

Reduce visibility of DirectRunner classes

Move inner classes of the DirectRunner to reduce total API Surface.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9edd8599
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9edd8599
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9edd8599

Branch: refs/heads/master
Commit: 9edd8599c28228cd5d7d5df1084f7e63684964d8
Parents: b263cb7
Author: Thomas Groh <tg...@google.com>
Authored: Wed Apr 26 13:38:37 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Wed May 3 14:12:05 2017 -0700

----------------------------------------------------------------------
 .../direct/AbstractModelEnforcement.java        |   1 -
 .../direct/BoundedReadEvaluatorFactory.java     |   2 -
 .../beam/runners/direct/BundleFactory.java      |   3 +-
 .../org/apache/beam/runners/direct/Clock.java   |   2 +-
 .../runners/direct/CloningBundleFactory.java    |   2 -
 .../beam/runners/direct/CommittedBundle.java    |  82 +++++++++++++
 .../beam/runners/direct/CommittedResult.java    |   1 -
 .../beam/runners/direct/CompletionCallback.java |   1 -
 .../CopyOnAccessInMemoryStateInternals.java     |   2 +-
 .../apache/beam/runners/direct/DirectGraph.java |  12 +-
 .../beam/runners/direct/DirectMetrics.java      |   1 -
 .../beam/runners/direct/DirectRunner.java       | 122 ++-----------------
 .../beam/runners/direct/EmptyInputProvider.java |   1 -
 .../runners/direct/EmptyTransformEvaluator.java |  50 --------
 .../beam/runners/direct/EvaluationContext.java  |   3 -
 .../runners/direct/ExecutorServiceFactory.java  |   2 +-
 .../direct/ExecutorServiceParallelExecutor.java |   1 -
 .../runners/direct/FlattenEvaluatorFactory.java |   2 -
 .../GroupAlsoByWindowEvaluatorFactory.java      |   2 -
 .../direct/GroupByKeyOnlyEvaluatorFactory.java  |   2 -
 .../ImmutabilityCheckingBundleFactory.java      |   2 -
 .../direct/ImmutabilityEnforcementFactory.java  |   1 -
 .../direct/ImmutableListBundleFactory.java      |   2 -
 .../beam/runners/direct/ModelEnforcement.java   |  10 +-
 .../runners/direct/ModelEnforcementFactory.java |   3 +-
 .../beam/runners/direct/NanosOffsetClock.java   |   2 +-
 .../runners/direct/PCollectionViewWriter.java   |  34 ++++++
 .../beam/runners/direct/ParDoEvaluator.java     |   1 -
 .../runners/direct/ParDoEvaluatorFactory.java   |   1 -
 .../direct/PassthroughTransformEvaluator.java   |   1 -
 .../beam/runners/direct/PipelineExecutor.java   |   1 -
 .../beam/runners/direct/RootInputProvider.java  |   1 -
 .../runners/direct/RootProviderRegistry.java    |   1 -
 ...littableProcessElementsEvaluatorFactory.java |   1 -
 .../direct/StatefulParDoEvaluatorFactory.java   |   1 -
 .../runners/direct/StepTransformResult.java     |   3 +-
 .../direct/TestStreamEvaluatorFactory.java      |   2 -
 .../beam/runners/direct/TransformEvaluator.java |   3 +-
 .../direct/TransformEvaluatorFactory.java       |   6 +-
 .../direct/TransformEvaluatorRegistry.java      |   1 -
 .../beam/runners/direct/TransformExecutor.java  |   1 -
 .../beam/runners/direct/TransformResult.java    |   3 +-
 .../direct/UnboundedReadEvaluatorFactory.java   |   2 -
 .../beam/runners/direct/UncommittedBundle.java  |  57 +++++++++
 .../runners/direct/ViewEvaluatorFactory.java    |   3 +-
 .../beam/runners/direct/WatermarkManager.java   |   3 +-
 .../runners/direct/WindowEvaluatorFactory.java  |   2 -
 .../direct/BoundedReadEvaluatorFactoryTest.java |   2 -
 .../direct/CloningBundleFactoryTest.java        |   2 -
 .../runners/direct/CommittedResultTest.java     |  12 +-
 .../beam/runners/direct/DirectMetricsTest.java  |   1 -
 .../runners/direct/EvaluationContextTest.java   |   3 -
 .../direct/FlattenEvaluatorFactoryTest.java     |   2 -
 .../direct/GroupByKeyEvaluatorFactoryTest.java  |   2 -
 .../GroupByKeyOnlyEvaluatorFactoryTest.java     |   2 -
 .../ImmutabilityCheckingBundleFactoryTest.java  |   2 -
 .../ImmutabilityEnforcementFactoryTest.java     |   1 -
 .../direct/ImmutableListBundleFactoryTest.java  |   2 -
 .../beam/runners/direct/ParDoEvaluatorTest.java |   1 -
 .../StatefulParDoEvaluatorFactoryTest.java      |   2 -
 .../runners/direct/StepTransformResultTest.java |   1 -
 .../direct/TestStreamEvaluatorFactoryTest.java  |   1 -
 .../runners/direct/TransformExecutorTest.java   |   1 -
 .../UnboundedReadEvaluatorFactoryTest.java      |   2 -
 .../direct/ViewEvaluatorFactoryTest.java        |   2 -
 .../runners/direct/WatermarkManagerTest.java    |   2 -
 .../direct/WindowEvaluatorFactoryTest.java      |   2 -
 67 files changed, 214 insertions(+), 273 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AbstractModelEnforcement.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AbstractModelEnforcement.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AbstractModelEnforcement.java
index f09164b..40faf5a 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AbstractModelEnforcement.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AbstractModelEnforcement.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.runners.direct;
 
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.sdk.util.WindowedValue;
 
 /**

http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
index 0c2afe8..26f9851 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
@@ -31,8 +31,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import javax.annotation.Nullable;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
-import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.runners.direct.StepTransformResult.Builder;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.BoundedSource.BoundedReader;

http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BundleFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BundleFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BundleFactory.java
index b1cb9b1..e39b5d8 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BundleFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BundleFactory.java
@@ -18,14 +18,13 @@
 package org.apache.beam.runners.direct;
 
 import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly;
-import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PCollection;
 
 /**
  * A factory that creates {@link UncommittedBundle UncommittedBundles}.
  */
-public interface BundleFactory {
+interface BundleFactory {
   /**
    * Create an {@link UncommittedBundle} from an empty input. Elements added to the bundle do not
    * belong to a {@link PCollection}.

http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/Clock.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/Clock.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/Clock.java
index 88f8aab..1a93c62 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/Clock.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/Clock.java
@@ -22,7 +22,7 @@ import org.joda.time.Instant;
 /**
  * Access to the current time.
  */
-public interface Clock {
+interface Clock {
   /**
    * Returns the current time as an {@link Instant}.
    */

http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CloningBundleFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CloningBundleFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CloningBundleFactory.java
index 33241e3..68b059f 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CloningBundleFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CloningBundleFactory.java
@@ -18,8 +18,6 @@
 
 package org.apache.beam.runners.direct;
 
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
-import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.util.CoderUtils;

http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedBundle.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedBundle.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedBundle.java
new file mode 100644
index 0000000..79a96fe
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedBundle.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct;
+
+import javax.annotation.Nullable;
+import org.apache.beam.runners.core.TimerInternals.TimerData;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Instant;
+
+/**
+ * Part of a {@link PCollection}. Elements are output to an {@link UncommittedBundle}, which will
+ * eventually committed. Committed elements are executed by the {@link PTransform PTransforms}
+ * that consume the {@link PCollection} this bundle is
+ * a part of at a later point.
+ * @param <T> the type of elements contained within this bundle
+ */
+interface CommittedBundle<T> {
+  /**
+   * Returns the PCollection that the elements of this bundle belong to.
+   */
+  @Nullable
+  PCollection<T> getPCollection();
+
+  /**
+   * Returns the key that was output in the most recent {@link GroupByKey} in the
+   * execution of this bundle.
+   */
+  StructuralKey<?> getKey();
+
+  /**
+   * Returns an {@link Iterable} containing all of the elements that have been added to this
+   * {@link CommittedBundle}.
+   */
+  Iterable<WindowedValue<T>> getElements();
+
+  /**
+   * Returns the minimum timestamp among all of the elements of this {@link CommittedBundle}.
+   */
+  Instant getMinTimestamp();
+
+  /**
+   * Returns the processing time output watermark at the time the producing {@link PTransform}
+   * committed this bundle. Downstream synchronized processing time watermarks cannot progress
+   * past this point before consuming this bundle.
+   *
+   * <p>This value is no greater than the earliest incomplete processing time or synchronized
+   * processing time {@link TimerData timer} at the time this bundle was committed, including any
+   * timers that fired to produce this bundle.
+   */
+  Instant getSynchronizedProcessingOutputWatermark();
+
+  /**
+   * Return a new {@link CommittedBundle} that is like this one, except calls to
+   * {@link #getElements()} will return the provided elements. This bundle is unchanged.
+   *
+   * <p>The value of the {@link #getSynchronizedProcessingOutputWatermark() synchronized
+   * processing output watermark} of the returned {@link CommittedBundle} is equal to the value
+   * returned from the current bundle. This is used to ensure a {@link PTransform} that could not
+   * complete processing on input elements properly holds the synchronized processing time to the
+   * appropriate value.
+   */
+  CommittedBundle<T> withElements(Iterable<WindowedValue<T>> elements);
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java
index 4db7e18..99abdd3 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java
@@ -21,7 +21,6 @@ package org.apache.beam.runners.direct;
 import com.google.auto.value.AutoValue;
 import java.util.Set;
 import javax.annotation.Nullable;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java
index 766259d..7b5ef4b 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.runners.direct;
 
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 
 /**

http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java
index ef3a053..d2af93c 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java
@@ -60,7 +60,7 @@ import org.joda.time.Instant;
  * of {@link InMemoryState}. Whenever state that exists in the underlying {@link StateTable} is
  * accessed, an independent copy will be created within this table.
  */
-public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals {
+class CopyOnAccessInMemoryStateInternals<K> implements StateInternals {
   private final CopyOnAccessInMemoryStateTable table;
 
   private K key;

http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraph.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraph.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraph.java
index f208f6e..e163d83 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraph.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraph.java
@@ -63,27 +63,27 @@ class DirectGraph {
     this.stepNames = stepNames;
   }
 
-  public AppliedPTransform<?, ?, ?> getProducer(PValue produced) {
+  AppliedPTransform<?, ?, ?> getProducer(PValue produced) {
     return producers.get(produced);
   }
 
-  public List<AppliedPTransform<?, ?, ?>> getPrimitiveConsumers(PValue consumed) {
+  List<AppliedPTransform<?, ?, ?>> getPrimitiveConsumers(PValue consumed) {
     return primitiveConsumers.get(consumed);
   }
 
-  public Set<AppliedPTransform<?, ?, ?>> getRootTransforms() {
+  Set<AppliedPTransform<?, ?, ?>> getRootTransforms() {
     return rootTransforms;
   }
 
-  public Set<PCollectionView<?>> getViews() {
+  Set<PCollectionView<?>> getViews() {
     return views;
   }
 
-  public String getStepName(AppliedPTransform<?, ?, ?> step) {
+  String getStepName(AppliedPTransform<?, ?, ?> step) {
     return stepNames.get(step);
   }
 
-  public Collection<AppliedPTransform<?, ?, ?>> getPrimitiveTransforms() {
+  Collection<AppliedPTransform<?, ?, ?>> getPrimitiveTransforms() {
     return stepNames.keySet();
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java
index fb126fb..b6ca492 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java
@@ -30,7 +30,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.annotation.concurrent.GuardedBy;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.sdk.metrics.DistributionData;
 import org.apache.beam.sdk.metrics.DistributionResult;
 import org.apache.beam.sdk.metrics.GaugeData;

http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index e063bc3..c6168b3e 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -27,9 +27,7 @@ import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import javax.annotation.Nullable;
 import org.apache.beam.runners.core.SplittableParDo.GBKIntoKeyedWorkItems;
-import org.apache.beam.runners.core.TimerInternals.TimerData;
 import org.apache.beam.runners.core.construction.PTransformMatchers;
 import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult;
 import org.apache.beam.runners.direct.TestStreamEvaluatorFactory.DirectTestStreamFactory;
@@ -49,119 +47,14 @@ import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.ParDo.MultiOutput;
 import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
 import org.apache.beam.sdk.util.UserCodeException;
-import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
 import org.joda.time.Duration;
-import org.joda.time.Instant;
 
 /**
  * An In-Memory implementation of the Dataflow Programming Model. Supports Unbounded
  * {@link PCollection PCollections}.
  */
 public class DirectRunner extends PipelineRunner<DirectPipelineResult> {
-  /**
-   * Part of a {@link PCollection}. Elements are output to a bundle, which will cause them to be
-   * executed by {@link PTransform PTransforms} that consume the {@link PCollection} this bundle is
-   * a part of at a later point. This is an uncommitted bundle and can have elements added to it.
-   *
-   * @param <T> the type of elements that can be added to this bundle
-   */
-  interface UncommittedBundle<T> {
-    /**
-     * Returns the PCollection that the elements of this {@link UncommittedBundle} belong to.
-     */
-    @Nullable
-    PCollection<T> getPCollection();
-
-    /**
-     * Outputs an element to this bundle.
-     *
-     * @param element the element to add to this bundle
-     * @return this bundle
-     */
-    UncommittedBundle<T> add(WindowedValue<T> element);
-
-    /**
-     * Commits this {@link UncommittedBundle}, returning an immutable {@link CommittedBundle}
-     * containing all of the elements that were added to it. The {@link #add(WindowedValue)} method
-     * will throw an {@link IllegalStateException} if called after a call to commit.
-     * @param synchronizedProcessingTime the synchronized processing time at which this bundle was
-     *                                   committed
-     */
-    CommittedBundle<T> commit(Instant synchronizedProcessingTime);
-  }
-
-  /**
-   * Part of a {@link PCollection}. Elements are output to an {@link UncommittedBundle}, which will
-   * eventually committed. Committed elements are executed by the {@link PTransform PTransforms}
-   * that consume the {@link PCollection} this bundle is
-   * a part of at a later point.
-   * @param <T> the type of elements contained within this bundle
-   */
-  interface CommittedBundle<T> {
-    /**
-     * Returns the PCollection that the elements of this bundle belong to.
-     */
-    @Nullable
-    PCollection<T> getPCollection();
-
-    /**
-     * Returns the key that was output in the most recent {@link GroupByKey} in the
-     * execution of this bundle.
-     */
-    StructuralKey<?> getKey();
-
-    /**
-     * Returns an {@link Iterable} containing all of the elements that have been added to this
-     * {@link CommittedBundle}.
-     */
-    Iterable<WindowedValue<T>> getElements();
-
-    /**
-     * Returns the minimum timestamp among all of the elements of this {@link CommittedBundle}.
-     */
-    Instant getMinTimestamp();
-
-    /**
-     * Returns the processing time output watermark at the time the producing {@link PTransform}
-     * committed this bundle. Downstream synchronized processing time watermarks cannot progress
-     * past this point before consuming this bundle.
-     *
-     * <p>This value is no greater than the earliest incomplete processing time or synchronized
-     * processing time {@link TimerData timer} at the time this bundle was committed, including any
-     * timers that fired to produce this bundle.
-     */
-    Instant getSynchronizedProcessingOutputWatermark();
-
-    /**
-     * Return a new {@link CommittedBundle} that is like this one, except calls to
-     * {@link #getElements()} will return the provided elements. This bundle is unchanged.
-     *
-     * <p>The value of the {@link #getSynchronizedProcessingOutputWatermark() synchronized
-     * processing output watermark} of the returned {@link CommittedBundle} is equal to the value
-     * returned from the current bundle. This is used to ensure a {@link PTransform} that could not
-     * complete processing on input elements properly holds the synchronized processing time to the
-     * appropriate value.
-     */
-    CommittedBundle<T> withElements(Iterable<WindowedValue<T>> elements);
-  }
-
-  /**
-   * A {@link PCollectionViewWriter} is responsible for writing contents of a {@link PCollection} to
-   * a storage mechanism that can be read from while constructing a {@link PCollectionView}.
-   *
-   * @param <ElemT> the type of elements the input {@link PCollection} contains.
-   * @param <ViewT> the type of the PCollectionView this writer writes to.
-   */
-  interface PCollectionViewWriter<ElemT, ViewT> {
-    void add(Iterable<WindowedValue<ElemT>> values);
-  }
-
-  /** The set of {@link PTransform PTransforms} that execute a UDF. Useful for some enforcements. */
-  private static final Set<Class<? extends PTransform>> CONTAINS_UDF =
-      ImmutableSet.of(
-          Read.Bounded.class, Read.Unbounded.class, ParDo.SingleOutput.class, MultiOutput.class);
 
   enum Enforcement {
     ENCODABILITY {
@@ -177,11 +70,18 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> {
       }
     };
 
+    /**
+     * The set of {@link PTransform PTransforms} that execute a UDF. Useful for some enforcements.
+     */
+    private static final Set<Class<? extends PTransform>> CONTAINS_UDF =
+        ImmutableSet.of(
+            Read.Bounded.class, Read.Unbounded.class, ParDo.SingleOutput.class, MultiOutput.class);
+
     public abstract boolean appliesTo(PCollection<?> collection, DirectGraph graph);
 
     ////////////////////////////////////////////////////////////////////////////////////////////////
     // Utilities for creating enforcements
-    public static Set<Enforcement> enabled(DirectOptions options) {
+    static Set<Enforcement> enabled(DirectOptions options) {
       EnumSet<Enforcement> enabled = EnumSet.noneOf(Enforcement.class);
       if (options.isEnforceEncodability()) {
         enabled.add(ENCODABILITY);
@@ -192,7 +92,8 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> {
       return Collections.unmodifiableSet(enabled);
     }
 
-    public static BundleFactory bundleFactoryFor(Set<Enforcement> enforcements, DirectGraph graph) {
+    static BundleFactory bundleFactoryFor(
+        Set<Enforcement> enforcements, DirectGraph graph) {
       BundleFactory bundleFactory =
           enforcements.contains(Enforcement.ENCODABILITY)
               ? CloningBundleFactory.create()
@@ -430,7 +331,4 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> {
       return NanosOffsetClock.create();
     }
   }
-
-  private static class ComplexParDoMatcher {
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java
index 98d4a64..396cdee 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java
@@ -19,7 +19,6 @@ package org.apache.beam.runners.direct;
 
 import java.util.Collection;
 import java.util.Collections;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.values.PCollection;

http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyTransformEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyTransformEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyTransformEvaluator.java
deleted file mode 100644
index 85e5e70..0000000
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyTransformEvaluator.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct;
-
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.WindowedValue;
-
-/**
- * A {@link TransformEvaluator} that ignores all input and produces no output. The result of
- * invoking {@link #finishBundle()} on this evaluator is to return an
- * {@link TransformResult} with no elements and a timestamp hold equal to
- * {@link BoundedWindow#TIMESTAMP_MIN_VALUE}. Because the result contains no elements, this hold
- * will not affect the watermark.
- */
-final class EmptyTransformEvaluator<T> implements TransformEvaluator<T> {
-  public static <T> TransformEvaluator<T> create(AppliedPTransform<?, ?, ?> transform) {
-    return new EmptyTransformEvaluator<T>(transform);
-  }
-
-  private final AppliedPTransform<?, ?, ?> transform;
-
-  private EmptyTransformEvaluator(AppliedPTransform<?, ?, ?> transform) {
-    this.transform = transform;
-  }
-
-  @Override
-  public void processElement(WindowedValue<T> element) throws Exception {}
-
-  @Override
-  public TransformResult<T> finishBundle() throws Exception {
-    return StepTransformResult.<T>withHold(transform, BoundedWindow.TIMESTAMP_MIN_VALUE)
-        .build();
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
index 93d6f96..3cdf351 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
@@ -35,9 +35,6 @@ import org.apache.beam.runners.core.ExecutionContext;
 import org.apache.beam.runners.core.TimerInternals.TimerData;
 import org.apache.beam.runners.direct.CommittedResult.OutputType;
 import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
-import org.apache.beam.runners.direct.DirectRunner.PCollectionViewWriter;
-import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.runners.direct.WatermarkManager.FiredTimers;
 import org.apache.beam.runners.direct.WatermarkManager.TransformWatermarks;
 import org.apache.beam.sdk.Pipeline;

http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceFactory.java
index 91dc258..f9e9fa9 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceFactory.java
@@ -25,7 +25,7 @@ import java.util.concurrent.ExecutorService;
  * another (e.g., if any executor is shut down the remaining executors should continue to process
  * work).
  */
-public interface ExecutorServiceFactory {
+interface ExecutorServiceFactory {
   /**
    * Create a new {@link ExecutorService}.
    */

http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index c802c58..4da62d5 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -47,7 +47,6 @@ import javax.annotation.Nullable;
 import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.KeyedWorkItems;
 import org.apache.beam.runners.core.TimerInternals.TimerData;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.runners.direct.WatermarkManager.FiredTimers;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult.State;

http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
index 7c6d2a1..341ea4b 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
@@ -18,8 +18,6 @@
 package org.apache.beam.runners.direct;
 
 import com.google.common.collect.Iterables;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
-import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.Flatten.PCollections;

http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
index d006553..d00e408 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
@@ -39,8 +39,6 @@ import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
 import org.apache.beam.runners.core.triggers.TriggerStateMachines;
 import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
 import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupAlsoByWindow;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
-import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.metrics.Counter;
 import org.apache.beam.sdk.metrics.Metrics;

http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java
index ac0b14f..1ea8e76 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java
@@ -31,8 +31,6 @@ import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly;
 import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.KeyedWorkItems;
 import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
-import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.runners.direct.StepTransformResult.Builder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;

http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java
index 8d77e25..9aabddc 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java
@@ -21,9 +21,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.SetMultimap;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.runners.direct.DirectRunner.Enforcement;
-import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.transforms.DoFn;

http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactory.java
index 85fc374..8880af9 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactory.java
@@ -19,7 +19,6 @@ package org.apache.beam.runners.direct;
 
 import java.util.IdentityHashMap;
 import java.util.Map;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.transforms.AppliedPTransform;

http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java
index 36264ee..73734d0 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java
@@ -23,8 +23,6 @@ import static com.google.common.base.Preconditions.checkState;
 import com.google.auto.value.AutoValue;
 import com.google.common.collect.ImmutableList;
 import javax.annotation.Nullable;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
-import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;

http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcement.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcement.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcement.java
index 96dbc2b..d2e9424 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcement.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcement.java
@@ -27,7 +27,7 @@ import org.apache.beam.sdk.values.PCollection;
  *
  * <p>ModelEnforcement is performed on a per-element and per-bundle basis. The
  * {@link ModelEnforcement} is provided with the input bundle as part of
- * {@link ModelEnforcementFactory#forBundle(DirectRunner.CommittedBundle, AppliedPTransform)} each
+ * {@link ModelEnforcementFactory#forBundle(CommittedBundle, AppliedPTransform)} each
  * element before and after that element is provided to an underlying {@link TransformEvaluator},
  * and the output {@link TransformResult} and committed output bundles after the
  * {@link TransformEvaluator} has completed.
@@ -37,7 +37,7 @@ import org.apache.beam.sdk.values.PCollection;
  * (such as the immutability of input elements). When the element is output or the bundle is
  * completed, the required conditions can be enforced across all elements.
  */
-public interface ModelEnforcement<T> {
+interface ModelEnforcement<T> {
   /**
    * Called before a call to {@link TransformEvaluator#processElement(WindowedValue)} on the
    * provided {@link WindowedValue}.
@@ -53,10 +53,10 @@ public interface ModelEnforcement<T> {
   /**
    * Called after a bundle has been completed and {@link TransformEvaluator#finishBundle()} has been
    * called, producing the provided {@link TransformResult} and
-   * {@link DirectRunner.CommittedBundle output bundles}.
+   * {@link CommittedBundle output bundles}.
    */
   void afterFinish(
-      DirectRunner.CommittedBundle<T> input,
+      CommittedBundle<T> input,
       TransformResult<T> result,
-      Iterable<? extends DirectRunner.CommittedBundle<?>> outputs);
+      Iterable<? extends CommittedBundle<?>> outputs);
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcementFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcementFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcementFactory.java
index e0bbfcb..30f1d20 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcementFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcementFactory.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.runners.direct;
 
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 
 /**
@@ -25,6 +24,6 @@ import org.apache.beam.sdk.transforms.AppliedPTransform;
  * {@link CommittedBundle bundle}. {@link ModelEnforcement} instances are created before the
  * {@link TransformEvaluator} is created.
  */
-public interface ModelEnforcementFactory {
+interface ModelEnforcementFactory {
   <T> ModelEnforcement<T> forBundle(CommittedBundle<T> input, AppliedPTransform<?, ?, ?> consumer);
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/NanosOffsetClock.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/NanosOffsetClock.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/NanosOffsetClock.java
index 5a2b18d..5e86f4b 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/NanosOffsetClock.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/NanosOffsetClock.java
@@ -23,7 +23,7 @@ import org.joda.time.Instant;
 /**
  * A {@link Clock} that uses {@link System#nanoTime()} to track the progress of time.
  */
-public class NanosOffsetClock implements Clock {
+class NanosOffsetClock implements Clock {
   private final long baseMillis;
   private final long nanosAtBaseMillis;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PCollectionViewWriter.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PCollectionViewWriter.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PCollectionViewWriter.java
new file mode 100644
index 0000000..10c6b74
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PCollectionViewWriter.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+
+/**
+ * A {@link PCollectionViewWriter} is responsible for writing contents of a {@link PCollection} to
+ * a storage mechanism that can be read from while constructing a {@link PCollectionView}.
+ *
+ * @param <ElemT> the type of elements the input {@link PCollection} contains.
+ * @param <ViewT> the type of the PCollectionView this writer writes to.
+ */
+interface PCollectionViewWriter<ElemT, ViewT> {
+  void add(Iterable<WindowedValue<ElemT>> values);
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
index 2ea8a91..a3a345f 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
@@ -29,7 +29,6 @@ import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
 import org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner;
 import org.apache.beam.runners.core.TimerInternals.TimerData;
 import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
-import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.DoFn;

http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
index b00c2b6..39595d8 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
@@ -25,7 +25,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;

http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PassthroughTransformEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PassthroughTransformEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PassthroughTransformEvaluator.java
index 153af65..c57932c 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PassthroughTransformEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PassthroughTransformEvaluator.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.runners.direct;
 
-import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.util.WindowedValue;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PipelineExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PipelineExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PipelineExecutor.java
index 82f59a7..07212c7 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PipelineExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PipelineExecutor.java
@@ -18,7 +18,6 @@
 package org.apache.beam.runners.direct;
 
 import java.util.Collection;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult.State;
 import org.apache.beam.sdk.transforms.AppliedPTransform;

http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootInputProvider.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootInputProvider.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootInputProvider.java
index c3df103..88e0769 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootInputProvider.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootInputProvider.java
@@ -19,7 +19,6 @@
 package org.apache.beam.runners.direct;
 
 import java.util.Collection;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;

http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootProviderRegistry.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootProviderRegistry.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootProviderRegistry.java
index eb9492c..b06a41c 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootProviderRegistry.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootProviderRegistry.java
@@ -22,7 +22,6 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import com.google.common.collect.ImmutableMap;
 import java.util.Collection;
 import java.util.Map;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Flatten.PCollections;

http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
index 5f6b4f7..fb3a962 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
@@ -33,7 +33,6 @@ import org.apache.beam.runners.core.StateInternals;
 import org.apache.beam.runners.core.StateInternalsFactory;
 import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.core.TimerInternalsFactory;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.DoFn;

http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
index 7cf3840..f278e08 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
@@ -37,7 +37,6 @@ import org.apache.beam.runners.core.StateTag;
 import org.apache.beam.runners.core.StateTags;
 import org.apache.beam.runners.core.TimerInternals.TimerData;
 import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.runners.direct.ParDoMultiOverrideFactory.StatefulParDo;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.AppliedPTransform;

http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
index 2a2ccab..7e5f824 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
@@ -24,7 +24,6 @@ import java.util.Collection;
 import java.util.EnumSet;
 import java.util.Set;
 import org.apache.beam.runners.direct.CommittedResult.OutputType;
-import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
 import org.apache.beam.sdk.metrics.MetricUpdates;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
@@ -36,7 +35,7 @@ import org.joda.time.Instant;
  * An immutable {@link TransformResult}.
  */
 @AutoValue
-public abstract class StepTransformResult<InputT> implements TransformResult<InputT> {
+abstract class StepTransformResult<InputT> implements TransformResult<InputT> {
 
   public static <InputT> Builder<InputT> withHold(
       AppliedPTransform<?, ?, ?> transform, Instant watermarkHold) {

http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
index cba754e..b5486c0 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
@@ -29,8 +29,6 @@ import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.core.construction.ReplacementOutputs;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
-import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory;
 import org.apache.beam.sdk.testing.TestStream;
 import org.apache.beam.sdk.testing.TestStream.ElementEvent;

http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluator.java
index 79c942b..1a7209d 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluator.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.runners.direct;
 
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.sdk.util.WindowedValue;
 
 /**
@@ -26,7 +25,7 @@ import org.apache.beam.sdk.util.WindowedValue;
  *
  * @param <InputT> the type of elements that will be passed to {@link #processElement}
  */
-public interface TransformEvaluator<InputT> {
+interface TransformEvaluator<InputT> {
   /**
    * Process an element in the input {@link CommittedBundle}.
    *

http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java
index c7bc46f..c187359 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java
@@ -30,7 +30,7 @@ import org.apache.beam.sdk.transforms.PTransform;
  * <p>{@link TransformEvaluatorFactory TransformEvaluatorFactories} will be reused within a single
  * execution of a {@link Pipeline} but will not be reused across executions.
  */
-public interface TransformEvaluatorFactory {
+interface TransformEvaluatorFactory {
   /**
    * Create a new {@link TransformEvaluator} for the application of the {@link PTransform}.
    *
@@ -47,13 +47,13 @@ public interface TransformEvaluatorFactory {
    */
   @Nullable
   <InputT> TransformEvaluator<InputT> forApplication(
-      AppliedPTransform<?, ?, ?> application, DirectRunner.CommittedBundle<?> inputBundle)
+      AppliedPTransform<?, ?, ?> application, CommittedBundle<?> inputBundle)
       throws Exception;
 
   /**
    * Cleans up any state maintained by this {@link TransformEvaluatorFactory}. Called after a
    * {@link Pipeline} is shut down. No more calls to
-   * {@link #forApplication(AppliedPTransform, DirectRunner.CommittedBundle)} will be made after
+   * {@link #forApplication(AppliedPTransform, CommittedBundle)} will be made after
    * a call to {@link #cleanup()}.
    */
   void cleanup() throws Exception;

http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
index d06c460..a00253a 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
@@ -28,7 +28,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.beam.runners.core.SplittableParDo;
 import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupAlsoByWindow;
 import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.runners.direct.ParDoMultiOverrideFactory.StatefulParDo;
 import org.apache.beam.runners.direct.ViewOverrideFactory.WriteView;
 import org.apache.beam.sdk.io.Read;

http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
index bbc0aae..26c4f5c 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
@@ -21,7 +21,6 @@ import java.io.Closeable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.concurrent.Callable;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.sdk.metrics.MetricUpdates;
 import org.apache.beam.sdk.metrics.MetricsContainer;
 import org.apache.beam.sdk.metrics.MetricsEnvironment;

http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java
index 3a95df7..0b0790e 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java
@@ -20,7 +20,6 @@ package org.apache.beam.runners.direct;
 import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.direct.CommittedResult.OutputType;
-import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
 import org.apache.beam.sdk.metrics.MetricUpdates;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
@@ -36,7 +35,7 @@ import org.joda.time.Instant;
  * <p>Every transform evaluator has a defined input type, but {@link ParDo} has multiple outputs
  * so there is not necesssarily a defined output type.
  */
-public interface TransformResult<InputT> {
+interface TransformResult<InputT> {
   /**
    * Returns the {@link AppliedPTransform} that produced this result.
    *

http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
index d3609f8..922a681 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
@@ -29,8 +29,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ThreadLocalRandom;
 import javax.annotation.Nullable;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
-import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.runners.direct.UnboundedReadDeduplicator.NeverDeduplicator;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.Read.Unbounded;

http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UncommittedBundle.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UncommittedBundle.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UncommittedBundle.java
new file mode 100644
index 0000000..07fa138
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UncommittedBundle.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct;
+
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Instant;
+
+/**
+ * Part of a {@link PCollection}. Elements are output to a bundle, which will cause them to be
+ * executed by {@link PTransform PTransforms} that consume the {@link PCollection} this bundle is
+ * a part of at a later point. This is an uncommitted bundle and can have elements added to it.
+ *
+ * @param <T> the type of elements that can be added to this bundle
+ */
+interface UncommittedBundle<T> {
+  /**
+   * Returns the PCollection that the elements of this {@link UncommittedBundle} belong to.
+   */
+  @Nullable
+  PCollection<T> getPCollection();
+
+  /**
+   * Outputs an element to this bundle.
+   *
+   * @param element the element to add to this bundle
+   * @return this bundle
+   */
+  UncommittedBundle<T> add(WindowedValue<T> element);
+
+  /**
+   * Commits this {@link UncommittedBundle}, returning an immutable {@link CommittedBundle}
+   * containing all of the elements that were added to it. The {@link #add(WindowedValue)} method
+   * will throw an {@link IllegalStateException} if called after a call to commit.
+   * @param synchronizedProcessingTime the synchronized processing time at which this bundle was
+   *                                   committed
+   */
+  CommittedBundle<T> commit(Instant synchronizedProcessingTime);
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
index 8cbe8fc..f4648e9 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
@@ -21,7 +21,6 @@ import com.google.common.collect.Iterables;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.beam.runners.direct.CommittedResult.OutputType;
-import org.apache.beam.runners.direct.DirectRunner.PCollectionViewWriter;
 import org.apache.beam.runners.direct.StepTransformResult.Builder;
 import org.apache.beam.runners.direct.ViewOverrideFactory.WriteView;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
@@ -50,7 +49,7 @@ class ViewEvaluatorFactory implements TransformEvaluatorFactory {
   @Override
   public <T> TransformEvaluator<T> forApplication(
       AppliedPTransform<?, ?, ?> application,
-      DirectRunner.CommittedBundle<?> inputBundle) {
+      CommittedBundle<?> inputBundle) {
     @SuppressWarnings({"cast", "unchecked", "rawtypes"})
     TransformEvaluator<T> evaluator = createEvaluator(
             (AppliedPTransform) application);

http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
index 8c04362..b576e00 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
@@ -54,7 +54,6 @@ import javax.annotation.concurrent.GuardedBy;
 import org.apache.beam.runners.core.StateNamespace;
 import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.core.TimerInternals.TimerData;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -127,7 +126,7 @@ import org.joda.time.Instant;
  * Watermark_PCollection = Watermark_Out_ProducingPTransform
  * </pre>
  */
-public class WatermarkManager {
+class WatermarkManager {
   // The number of updates to apply in #tryApplyPendingUpdates
   private static final int MAX_INCREMENTAL_UPDATES = 10;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
index 2550924..30d507b 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
@@ -20,8 +20,6 @@ package org.apache.beam.runners.direct;
 import com.google.common.collect.Iterables;
 import java.util.Collection;
 import javax.annotation.Nullable;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
-import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;

http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
index 2b5b46d..df7c18e 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
@@ -39,8 +39,6 @@ import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.concurrent.CountDownLatch;
 import org.apache.beam.runners.direct.BoundedReadEvaluatorFactory.BoundedSourceShard;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
-import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.sdk.coders.BigEndianLongCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.VarLongCoder;

http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java
index c6054b6..7d037d1 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java
@@ -31,8 +31,6 @@ import com.google.common.collect.Iterables;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
-import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.KvCoder;

http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
index 68d6eba..077cd43 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
@@ -73,7 +73,7 @@ public class CommittedResultTest implements Serializable {
         CommittedResult.create(
             StepTransformResult.withoutHold(transform).build(),
             bundleFactory.createBundle(created).commit(Instant.now()),
-            Collections.<DirectRunner.CommittedBundle<?>>emptyList(),
+            Collections.<CommittedBundle<?>>emptyList(),
             EnumSet.noneOf(OutputType.class));
 
     assertThat(result.getTransform(), Matchers.<AppliedPTransform<?, ?, ?>>equalTo(transform));
@@ -81,7 +81,7 @@ public class CommittedResultTest implements Serializable {
 
   @Test
   public void getUncommittedElementsEqualInput() {
-    DirectRunner.CommittedBundle<Integer> bundle =
+    CommittedBundle<Integer> bundle =
         bundleFactory.createBundle(created)
             .add(WindowedValue.valueInGlobalWindow(2))
             .commit(Instant.now());
@@ -89,11 +89,11 @@ public class CommittedResultTest implements Serializable {
         CommittedResult.create(
             StepTransformResult.withoutHold(transform).build(),
             bundle,
-            Collections.<DirectRunner.CommittedBundle<?>>emptyList(),
+            Collections.<CommittedBundle<?>>emptyList(),
             EnumSet.noneOf(OutputType.class));
 
     assertThat(result.getUnprocessedInputs(),
-        Matchers.<DirectRunner.CommittedBundle<?>>equalTo(bundle));
+        Matchers.<CommittedBundle<?>>equalTo(bundle));
   }
 
   @Test
@@ -102,7 +102,7 @@ public class CommittedResultTest implements Serializable {
         CommittedResult.create(
             StepTransformResult.withoutHold(transform).build(),
             null,
-            Collections.<DirectRunner.CommittedBundle<?>>emptyList(),
+            Collections.<CommittedBundle<?>>emptyList(),
             EnumSet.noneOf(OutputType.class));
 
     assertThat(result.getUnprocessedInputs(), nullValue());
@@ -110,7 +110,7 @@ public class CommittedResultTest implements Serializable {
 
   @Test
   public void getOutputsEqualInput() {
-    List<? extends DirectRunner.CommittedBundle<?>> outputs =
+    List<? extends CommittedBundle<?>> outputs =
         ImmutableList.of(bundleFactory.createBundle(PCollection.createPrimitiveOutputInternal(p,
             WindowingStrategy.globalDefault(),
             PCollection.IsBounded.BOUNDED)).commit(Instant.now()),

http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java
index ee51e9a..d5d0aff 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java
@@ -26,7 +26,6 @@ import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.Assert.assertThat;
 
 import com.google.common.collect.ImmutableList;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.sdk.metrics.DistributionData;
 import org.apache.beam.sdk.metrics.DistributionResult;
 import org.apache.beam.sdk.metrics.GaugeData;

http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
index bfbcd79..40582d9 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
@@ -36,9 +36,6 @@ import org.apache.beam.runners.core.StateTag;
 import org.apache.beam.runners.core.StateTags;
 import org.apache.beam.runners.core.TimerInternals.TimerData;
 import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
-import org.apache.beam.runners.direct.DirectRunner.PCollectionViewWriter;
-import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.runners.direct.WatermarkManager.FiredTimers;
 import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
 import org.apache.beam.sdk.coders.ByteArrayCoder;

http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java
index e07c9f9..7dc01e6 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java
@@ -24,8 +24,6 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import com.google.common.collect.Iterables;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
-import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.AppliedPTransform;

http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java
index fefafd0..6dcd5e2 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java
@@ -28,8 +28,6 @@ import com.google.common.collect.Multiset;
 import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.KeyedWorkItems;
 import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
-import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;

http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java
index 94514ad..1373219 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java
@@ -28,8 +28,6 @@ import com.google.common.collect.Multiset;
 import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.KeyedWorkItems;
 import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
-import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;

http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java
index 838e0bd..95c0ad1 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java
@@ -20,8 +20,6 @@ package org.apache.beam.runners.direct;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.Assert.assertThat;
 
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
-import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.testing.TestPipeline;

http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java
index cd3e9b4..1cd5786 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java
@@ -19,7 +19,6 @@ package org.apache.beam.runners.direct;
 
 import java.io.Serializable;
 import java.util.Collections;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Count;

http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java
index c5ad0cd..4a392db 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java
@@ -26,8 +26,6 @@ import com.google.common.collect.ImmutableList;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
-import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;

http://git-wip-us.apache.org/repos/asf/beam/blob/9edd8599/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
index 69dbc22..ef8add9 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
@@ -31,7 +31,6 @@ import java.util.Collection;
 import java.util.List;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
-import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.AppliedPTransform;