You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/09/28 17:48:32 UTC

[2/2] incubator-beam git commit: Refactor BundleFactory methods

Refactor BundleFactory methods

Remove the inputBundle parameter to createBundle and createKeyedBundle.
This parameter is unless the model is capable of propagating keys
between PTransforms.

Remove the PCollection parameter to createRootBundle. createRootBundle
should be a par

Make [Un]CommmittedBundle#getPCollection nullable. Bundles are utilized
by the runner to control the processing of elements, but may not always
belong to a PCollection. Update ImmutabilityCheckingBundleFactory to
return an underlying bundle as the result of createRootBundle.

Use createBundle in Root transform evaluators.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/759b6cad
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/759b6cad
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/759b6cad

Branch: refs/heads/master
Commit: 759b6cada9f1b724e32457b900a469f8a113542d
Parents: a1ac222
Author: Thomas Groh <tg...@google.com>
Authored: Tue Sep 27 13:06:35 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Sep 28 10:48:23 2016 -0700

----------------------------------------------------------------------
 .../direct/BoundedReadEvaluatorFactory.java     |  2 +-
 .../beam/runners/direct/BundleFactory.java      | 15 ++--
 .../beam/runners/direct/DirectRunner.java       |  8 +-
 .../beam/runners/direct/EvaluationContext.java  | 12 +--
 .../direct/ExecutorServiceParallelExecutor.java |  3 +-
 .../runners/direct/FlattenEvaluatorFactory.java |  2 +-
 .../direct/GroupByKeyOnlyEvaluatorFactory.java  |  7 +-
 .../ImmutabilityCheckingBundleFactory.java      | 18 ++--
 .../direct/ImmutableListBundleFactory.java      | 72 +++++-----------
 .../beam/runners/direct/ParDoEvaluator.java     |  2 +-
 .../beam/runners/direct/StructuralKey.java      | 88 +++++++++++++-------
 .../direct/TestStreamEvaluatorFactory.java      |  2 +-
 .../direct/UnboundedReadEvaluatorFactory.java   |  2 +-
 .../direct/UncommittedBundleOutputManager.java  |  4 +-
 .../runners/direct/WindowEvaluatorFactory.java  |  8 +-
 .../direct/BoundedReadEvaluatorFactoryTest.java | 26 +++---
 .../runners/direct/CommittedResultTest.java     | 10 +--
 .../EncodabilityEnforcementFactoryTest.java     |  6 +-
 .../runners/direct/EvaluationContextTest.java   | 31 ++-----
 .../direct/FlattenEvaluatorFactoryTest.java     | 11 ++-
 .../direct/GroupByKeyEvaluatorFactoryTest.java  | 22 ++---
 .../GroupByKeyOnlyEvaluatorFactoryTest.java     | 31 +++----
 .../ImmutabilityCheckingBundleFactoryTest.java  | 59 +++----------
 .../ImmutabilityEnforcementFactoryTest.java     |  6 +-
 .../direct/ImmutableListBundleFactoryTest.java  | 52 ++----------
 .../beam/runners/direct/ParDoEvaluatorTest.java |  7 +-
 .../direct/ParDoMultiEvaluatorFactoryTest.java  | 48 +++++------
 .../direct/ParDoSingleEvaluatorFactoryTest.java | 24 +++---
 .../runners/direct/StepTransformResultTest.java |  4 +-
 .../beam/runners/direct/StructuralKeyTest.java  |  9 ++
 .../direct/TestStreamEvaluatorFactoryTest.java  | 15 ++--
 .../runners/direct/TransformExecutorTest.java   | 12 +--
 .../UnboundedReadEvaluatorFactoryTest.java      | 28 +++----
 .../direct/ViewEvaluatorFactoryTest.java        |  2 +-
 .../runners/direct/WatermarkManagerTest.java    | 38 ++++-----
 .../direct/WindowEvaluatorFactoryTest.java      |  6 +-
 36 files changed, 297 insertions(+), 395 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/759b6cad/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
index 9c77946..2260135 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
@@ -132,7 +132,7 @@ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory {
           source.createReader(evaluationContext.getPipelineOptions())) {
         boolean contentsRemaining = reader.start();
         UncommittedBundle<OutputT> output =
-            evaluationContext.createRootBundle(transform.getOutput());
+            evaluationContext.createBundle(transform.getOutput());
         while (contentsRemaining) {
           output.add(
               WindowedValue.timestampedValueInGlobalWindow(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/759b6cad/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BundleFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BundleFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BundleFactory.java
index 0241d87..b1cb9b1 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BundleFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BundleFactory.java
@@ -18,7 +18,6 @@
 package org.apache.beam.runners.direct;
 
 import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PCollection;
@@ -28,22 +27,24 @@ import org.apache.beam.sdk.values.PCollection;
  */
 public interface BundleFactory {
   /**
-   * Create an {@link UncommittedBundle} from an empty input. Elements added to the bundle belong to
-   * the {@code output} {@link PCollection}.
+   * Create an {@link UncommittedBundle} from an empty input. Elements added to the bundle do not
+   * belong to a {@link PCollection}.
+   *
+   * <p>For use in creating inputs to root transforms.
    */
-  public <T> UncommittedBundle<T> createRootBundle(PCollection<T> output);
+  <T> UncommittedBundle<T> createRootBundle();
 
   /**
    * Create an {@link UncommittedBundle} from the specified input. Elements added to the bundle
    * belong to the {@code output} {@link PCollection}.
    */
-  public <T> UncommittedBundle<T> createBundle(CommittedBundle<?> input, PCollection<T> output);
+  <T> UncommittedBundle<T> createBundle(PCollection<T> output);
 
   /**
    * Create an {@link UncommittedBundle} with the specified keys at the specified step. For use by
    * {@link DirectGroupByKeyOnly} {@link PTransform PTransforms}. Elements added to the bundle
    * belong to the {@code output} {@link PCollection}.
    */
-  public <K, T> UncommittedBundle<T> createKeyedBundle(
-      CommittedBundle<?> input, StructuralKey<K> key, PCollection<T> output);
+  <K, T> UncommittedBundle<T> createKeyedBundle(
+      StructuralKey<K> key, PCollection<T> output);
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/759b6cad/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index d8d82bd..a3d20f6 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -28,6 +28,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import javax.annotation.Nullable;
 import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly;
 import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult;
 import org.apache.beam.runners.direct.TestStreamEvaluatorFactory.DirectTestStreamFactory;
@@ -90,13 +91,13 @@ public class DirectRunner
    *
    * @param <T> the type of elements that can be added to this bundle
    */
-  public static interface UncommittedBundle<T> {
+  interface UncommittedBundle<T> {
     /**
      * Returns the PCollection that the elements of this {@link UncommittedBundle} belong to.
      */
+    @Nullable
     PCollection<T> getPCollection();
 
-
     /**
      * Outputs an element to this bundle.
      *
@@ -122,10 +123,11 @@ public class DirectRunner
    * a part of at a later point.
    * @param <T> the type of elements contained within this bundle
    */
-  public static interface CommittedBundle<T> {
+  interface CommittedBundle<T> {
     /**
      * Returns the PCollection that the elements of this bundle belong to.
      */
+    @Nullable
     PCollection<T> getPCollection();
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/759b6cad/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
index 5af25bc..16cf096 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
@@ -226,16 +226,16 @@ class EvaluationContext {
   /**
    * Create a {@link UncommittedBundle} for use by a source.
    */
-  public <T> UncommittedBundle<T> createRootBundle(PCollection<T> output) {
-    return bundleFactory.createRootBundle(output);
+  public <T> UncommittedBundle<T> createRootBundle() {
+    return bundleFactory.createRootBundle();
   }
 
   /**
    * Create a {@link UncommittedBundle} whose elements belong to the specified {@link
    * PCollection}.
    */
-  public <T> UncommittedBundle<T> createBundle(CommittedBundle<?> input, PCollection<T> output) {
-    return bundleFactory.createBundle(input, output);
+  public <T> UncommittedBundle<T> createBundle(PCollection<T> output) {
+    return bundleFactory.createBundle(output);
   }
 
   /**
@@ -243,8 +243,8 @@ class EvaluationContext {
    * {@link DirectGroupByKeyOnly} {@link PTransform PTransforms}.
    */
   public <K, T> UncommittedBundle<T> createKeyedBundle(
-      CommittedBundle<?> input, StructuralKey<K> key, PCollection<T> output) {
-    return bundleFactory.createKeyedBundle(input, key, output);
+      StructuralKey<K> key, PCollection<T> output) {
+    return bundleFactory.createKeyedBundle(key, output);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/759b6cad/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index e765bd3..9e11f6d 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -422,8 +422,7 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
               @SuppressWarnings({"unchecked", "rawtypes"})
               CommittedBundle<?> bundle =
                   evaluationContext
-                      .createKeyedBundle(
-                          null, keyTimers.getKey(), (PCollection) transform.getInput())
+                      .createKeyedBundle(keyTimers.getKey(), (PCollection) transform.getInput())
                       .add(WindowedValue.valueInEmptyWindows(work))
                       .commit(evaluationContext.now());
               scheduleConsumption(transform, bundle, new TimerIterableCompletionCallback(delivery));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/759b6cad/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
index 456921c..4fa8854 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
@@ -64,7 +64,7 @@ class FlattenEvaluatorFactory implements TransformEvaluatorFactory {
           null, StepTransformResult.withoutHold(application).build());
     }
     final UncommittedBundle<InputT> outputBundle =
-        evaluationContext.createBundle(inputBundle, application.getOutput());
+        evaluationContext.createBundle(application.getOutput());
     final TransformResult result =
         StepTransformResult.withoutHold(application).addOutput(outputBundle).build();
     return new FlattenEvaluator<>(outputBundle, result);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/759b6cad/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java
index 61d0e7b..2ead782 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java
@@ -153,10 +153,9 @@ class GroupByKeyOnlyEvaluatorFactory implements TransformEvaluatorFactory {
         K key = groupedEntry.getKey().key;
         KeyedWorkItem<K, V> groupedKv =
             KeyedWorkItems.elementsWorkItem(key, groupedEntry.getValue());
-        UncommittedBundle<KeyedWorkItem<K, V>> bundle = evaluationContext.createKeyedBundle(
-            inputBundle,
-            StructuralKey.of(key, keyCoder),
-            application.getOutput());
+        UncommittedBundle<KeyedWorkItem<K, V>> bundle =
+            evaluationContext.createKeyedBundle(
+                StructuralKey.of(key, keyCoder), application.getOutput());
         bundle.add(WindowedValue.valueInGlobalWindow(groupedKv));
         resultBuilder.addOutput(bundle);
       }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/759b6cad/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java
index 71bd8b4..08c6e78 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java
@@ -58,20 +58,26 @@ class ImmutabilityCheckingBundleFactory implements BundleFactory {
     this.underlying = checkNotNull(underlying);
   }
 
+  /**
+   * {@inheritDoc}.
+   *
+   * @return a root bundle created by the underlying {@link PCollection}. Root bundles belong to the
+   * runner, which is required to use the contents in a way that is mutation-safe.
+   */
   @Override
-  public <T> UncommittedBundle<T> createRootBundle(PCollection<T> output) {
-    return new ImmutabilityEnforcingBundle<>(underlying.createRootBundle(output));
+  public <T> UncommittedBundle<T> createRootBundle() {
+    return underlying.createRootBundle();
   }
 
   @Override
-  public <T> UncommittedBundle<T> createBundle(CommittedBundle<?> input, PCollection<T> output) {
-    return new ImmutabilityEnforcingBundle<>(underlying.createBundle(input, output));
+  public <T> UncommittedBundle<T> createBundle(PCollection<T> output) {
+    return new ImmutabilityEnforcingBundle<>(underlying.createBundle(output));
   }
 
   @Override
   public <K, T> UncommittedBundle<T> createKeyedBundle(
-      CommittedBundle<?> input, StructuralKey<K> key, PCollection<T> output) {
-    return new ImmutabilityEnforcingBundle<>(underlying.createKeyedBundle(input, key, output));
+      StructuralKey<K> key, PCollection<T> output) {
+    return new ImmutabilityEnforcingBundle<>(underlying.createKeyedBundle(key, output));
   }
 
   private static class ImmutabilityEnforcingBundle<T> implements UncommittedBundle<T> {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/759b6cad/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java
index e79da7b..53b7e54 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java
@@ -19,11 +19,11 @@ package org.apache.beam.runners.direct;
 
 import static com.google.common.base.Preconditions.checkState;
 
-import com.google.common.base.MoreObjects;
+import com.google.auto.value.AutoValue;
 import com.google.common.collect.ImmutableList;
+import javax.annotation.Nullable;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
-import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
 import org.joda.time.Instant;
@@ -39,18 +39,18 @@ class ImmutableListBundleFactory implements BundleFactory {
   private ImmutableListBundleFactory() {}
 
   @Override
-  public <T> UncommittedBundle<T> createRootBundle(PCollection<T> output) {
-    return UncommittedImmutableListBundle.create(output, StructuralKey.of(null, VoidCoder.of()));
+  public <T> UncommittedBundle<T> createRootBundle() {
+    return UncommittedImmutableListBundle.create(null, StructuralKey.empty());
   }
 
   @Override
-  public <T> UncommittedBundle<T> createBundle(CommittedBundle<?> input, PCollection<T> output) {
-    return UncommittedImmutableListBundle.create(output, input.getKey());
+  public <T> UncommittedBundle<T> createBundle(PCollection<T> output) {
+    return UncommittedImmutableListBundle.create(output, StructuralKey.empty());
   }
 
   @Override
   public <K, T> UncommittedBundle<T> createKeyedBundle(
-      CommittedBundle<?> input, StructuralKey<K> key, PCollection<T> output) {
+      StructuralKey<K> key, PCollection<T> output) {
     return UncommittedImmutableListBundle.create(output, key);
   }
 
@@ -99,63 +99,29 @@ class ImmutableListBundleFactory implements BundleFactory {
       checkState(!committed, "Can't commit already committed bundle %s", this);
       committed = true;
       final Iterable<WindowedValue<T>> committedElements = elements.build();
-      return new CommittedImmutableListBundle<>(
+      return CommittedImmutableListBundle.create(
           pcollection, key, committedElements, synchronizedCompletionTime);
     }
   }
 
-  private static class CommittedImmutableListBundle<T> implements CommittedBundle<T> {
-    public CommittedImmutableListBundle(
-        PCollection<T> pcollection,
+  @AutoValue
+  abstract static class CommittedImmutableListBundle<T> implements CommittedBundle<T> {
+    public static <T> CommittedImmutableListBundle<T> create(
+        @Nullable PCollection<T> pcollection,
         StructuralKey<?> key,
         Iterable<WindowedValue<T>> committedElements,
         Instant synchronizedCompletionTime) {
-      this.pcollection = pcollection;
-      this.key = key;
-      this.committedElements = committedElements;
-      this.synchronizedCompletionTime = synchronizedCompletionTime;
-    }
-
-    private final PCollection<T> pcollection;
-    /** The structural value key of the Bundle, as specified by the coder that created it. */
-    private final StructuralKey<?> key;
-    private final Iterable<WindowedValue<T>> committedElements;
-    private final Instant synchronizedCompletionTime;
-
-    @Override
-    public StructuralKey<?> getKey() {
-      return key;
-    }
-
-    @Override
-    public Iterable<WindowedValue<T>> getElements() {
-      return committedElements;
-    }
-
-    @Override
-    public PCollection<T> getPCollection() {
-      return pcollection;
-    }
-
-    @Override
-    public Instant getSynchronizedProcessingOutputWatermark() {
-      return synchronizedCompletionTime;
-    }
-
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(this)
-          .omitNullValues()
-          .add("pcollection", pcollection)
-          .add("key", key)
-          .add("elements", committedElements)
-          .toString();
+      return new AutoValue_ImmutableListBundleFactory_CommittedImmutableListBundle(
+          pcollection, key, committedElements, synchronizedCompletionTime);
     }
 
     @Override
     public CommittedBundle<T> withElements(Iterable<WindowedValue<T>> elements) {
-      return new CommittedImmutableListBundle<>(
-          pcollection, key, ImmutableList.copyOf(elements), synchronizedCompletionTime);
+      return create(
+          getPCollection(),
+          getKey(),
+          ImmutableList.copyOf(elements),
+          getSynchronizedProcessingOutputWatermark());
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/759b6cad/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
index 99ab22a..a761289 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
@@ -60,7 +60,7 @@ class ParDoEvaluator<T> implements TransformEvaluator<T> {
     for (Map.Entry<TupleTag<?>, PCollection<?>> outputEntry : outputs.entrySet()) {
       outputBundles.put(
           outputEntry.getKey(),
-          evaluationContext.createBundle(inputBundle, outputEntry.getValue()));
+          evaluationContext.createBundle(outputEntry.getValue()));
     }
 
     ReadyCheckingSideInputReader sideInputReader =

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/759b6cad/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StructuralKey.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StructuralKey.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StructuralKey.java
index 249ccfe..61332f9 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StructuralKey.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StructuralKey.java
@@ -26,52 +26,78 @@ import org.apache.beam.sdk.util.CoderUtils;
  * A (Key, Coder) pair that uses the structural value of the key (as provided by
  * {@link Coder#structuralValue(Object)}) to perform equality and hashing.
  */
-class StructuralKey<K> {
+abstract class StructuralKey<K> {
+
+  private StructuralKey() {
+    // Prevents extending outside of this class
+  }
+
+  /**
+   * Returns the key that this {@link StructuralKey} was created from.
+   */
+  public abstract K getKey();
+
+  /**
+   * Get the empty {@link StructuralKey}. All instances of the empty key are considered equal.
+   */
+  static StructuralKey<?> empty() {
+    StructuralKey<Object> emptyKey = new StructuralKey<Object>() {
+      @Override
+      public Object getKey() {
+        return this;
+      }
+    };
+    return emptyKey;
+  }
+
   /**
    * Create a new Structural Key of the provided key that can be encoded by the provided coder.
    */
-  public static <K> StructuralKey<K> of(K key, Coder<K> coder) {
+  static <K> StructuralKey<K> of(K key, Coder<K> coder) {
     try {
-      return new StructuralKey<>(coder, key);
+      return new CoderStructuralKey<>(coder, key);
     } catch (Exception e) {
       throw new IllegalArgumentException(
           "Could not encode a key with its provided coder " + coder.getClass().getSimpleName(), e);
     }
   }
 
-  private final Coder<K> coder;
-  private final Object structuralValue;
-  private final byte[] encoded;
-
-  private StructuralKey(Coder<K> coder, K key) throws Exception {
-    this.coder = coder;
-    this.structuralValue = coder.structuralValue(key);
-    this.encoded = CoderUtils.encodeToByteArray(coder, key);
-  }
+  private static class CoderStructuralKey<K> extends StructuralKey<K> {
+    private final Coder<K> coder;
+    private final Object structuralValue;
+    private final byte[] encoded;
 
-  public K getKey() {
-    try {
-      return CoderUtils.decodeFromByteArray(coder, encoded);
-    } catch (CoderException e) {
-      throw new IllegalArgumentException(
-          "Could not decode Key with coder of type " + coder.getClass().getSimpleName());
+    private CoderStructuralKey(Coder<K> coder, K key) throws Exception {
+      this.coder = coder;
+      this.structuralValue = coder.structuralValue(key);
+      this.encoded = CoderUtils.encodeToByteArray(coder, key);
     }
-  }
 
-  @Override
-  public boolean equals(Object other) {
-    if (other == this) {
-      return true;
+    @Override
+    public K getKey() {
+      try {
+        return CoderUtils.decodeFromByteArray(coder, encoded);
+      } catch (CoderException e) {
+        throw new IllegalArgumentException(
+            "Could not decode Key with coder of type " + coder.getClass().getSimpleName(), e);
+      }
     }
-    if (other instanceof StructuralKey) {
-      StructuralKey that = (StructuralKey) other;
-      return structuralValue.equals(that.structuralValue);
+
+    @Override
+    public boolean equals(Object other) {
+      if (other == this) {
+        return true;
+      }
+      if (other instanceof CoderStructuralKey) {
+        CoderStructuralKey that = (CoderStructuralKey) other;
+        return structuralValue.equals(that.structuralValue);
+      }
+      return false;
     }
-    return false;
-  }
 
-  @Override
-  public int hashCode() {
-    return structuralValue.hashCode();
+    @Override
+    public int hashCode() {
+      return structuralValue.hashCode();
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/759b6cad/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
index 2adff59..5a94143 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
@@ -124,7 +124,7 @@ class TestStreamEvaluatorFactory implements TransformEvaluatorFactory {
         StepTransformResult.Builder result =
             StepTransformResult.withHold(application, currentWatermark);
         if (event.getType().equals(EventType.ELEMENT)) {
-          UncommittedBundle<T> bundle = context.createRootBundle(application.getOutput());
+          UncommittedBundle<T> bundle = context.createBundle(application.getOutput());
           for (TimestampedValue<T> elem : ((ElementEvent<T>) event).getElements()) {
             bundle.add(
                 WindowedValue.timestampedValueInGlobalWindow(elem.getValue(), elem.getTimestamp()));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/759b6cad/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
index 9fb3dbf..557c9a8 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
@@ -171,7 +171,7 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
 
     @Override
     public TransformResult finishBundle() throws IOException {
-      UncommittedBundle<OutputT> output = evaluationContext.createRootBundle(transform.getOutput());
+      UncommittedBundle<OutputT> output = evaluationContext.createBundle(transform.getOutput());
       try {
         boolean elementAvailable = startReader();
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/759b6cad/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UncommittedBundleOutputManager.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UncommittedBundleOutputManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UncommittedBundleOutputManager.java
index d40dc11..6c7dec8 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UncommittedBundleOutputManager.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UncommittedBundleOutputManager.java
@@ -28,7 +28,7 @@ import org.apache.beam.sdk.values.TupleTag;
  * An {@link OutputManager} that outputs to {@link CommittedBundle Bundles} used by the
  * {@link DirectRunner}.
  */
-public class UncommittedBundleOutputManager implements OutputManager {
+class UncommittedBundleOutputManager implements OutputManager {
   private final Map<TupleTag<?>, UncommittedBundle<?>> bundles;
 
   public static UncommittedBundleOutputManager create(
@@ -36,7 +36,7 @@ public class UncommittedBundleOutputManager implements OutputManager {
     return new UncommittedBundleOutputManager(outputBundles);
   }
 
-  public UncommittedBundleOutputManager(Map<TupleTag<?>, UncommittedBundle<?>> bundles) {
+  UncommittedBundleOutputManager(Map<TupleTag<?>, UncommittedBundle<?>> bundles) {
     this.bundles = bundles;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/759b6cad/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
index 47848e6..eb53b7f 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
@@ -50,16 +50,14 @@ class WindowEvaluatorFactory implements TransformEvaluatorFactory {
       @Nullable CommittedBundle<?> inputBundle
  )
       throws Exception {
-    return createTransformEvaluator(
-        (AppliedPTransform) application, inputBundle);
+    return createTransformEvaluator((AppliedPTransform) application);
   }
 
   private <InputT> TransformEvaluator<InputT> createTransformEvaluator(
-      AppliedPTransform<PCollection<InputT>, PCollection<InputT>, Window.Bound<InputT>> transform,
-      CommittedBundle<?> inputBundle) {
+      AppliedPTransform<PCollection<InputT>, PCollection<InputT>, Window.Bound<InputT>> transform) {
     WindowFn<? super InputT, ?> fn = transform.getTransform().getWindowFn();
     UncommittedBundle<InputT> outputBundle =
-        evaluationContext.createBundle(inputBundle, transform.getOutput());
+        evaluationContext.createBundle(transform.getOutput());
     if (fn == null) {
       return PassthroughTransformEvaluator.create(transform, outputBundle);
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/759b6cad/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
index cdd1661..e725dd3 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
@@ -77,8 +77,8 @@ public class BoundedReadEvaluatorFactoryTest {
 
   @Test
   public void boundedSourceInMemoryTransformEvaluatorProducesElements() throws Exception {
-    UncommittedBundle<Long> output = bundleFactory.createRootBundle(longs);
-    when(context.createRootBundle(longs)).thenReturn(output);
+    UncommittedBundle<Long> output = bundleFactory.createBundle(longs);
+    when(context.createBundle(longs)).thenReturn(output);
 
     TransformEvaluator<?> evaluator =
         factory.forApplication(longs.getProducingTransformInternal(), null);
@@ -97,8 +97,8 @@ public class BoundedReadEvaluatorFactoryTest {
    */
   @Test
   public void boundedSourceInMemoryTransformEvaluatorAfterFinishIsEmpty() throws Exception {
-    UncommittedBundle<Long> output = bundleFactory.createRootBundle(longs);
-    when(context.createRootBundle(longs)).thenReturn(output);
+    UncommittedBundle<Long> output = bundleFactory.createBundle(longs);
+    when(context.createBundle(longs)).thenReturn(output);
 
     TransformEvaluator<?> evaluator =
         factory.forApplication(longs.getProducingTransformInternal(), null);
@@ -111,8 +111,8 @@ public class BoundedReadEvaluatorFactoryTest {
         containsInAnyOrder(
             gw(1L), gw(2L), gw(4L), gw(8L), gw(9L), gw(7L), gw(6L), gw(5L), gw(3L), gw(0L)));
 
-    UncommittedBundle<Long> secondOutput = bundleFactory.createRootBundle(longs);
-    when(context.createRootBundle(longs)).thenReturn(secondOutput);
+    UncommittedBundle<Long> secondOutput = bundleFactory.createBundle(longs);
+    when(context.createBundle(longs)).thenReturn(secondOutput);
     TransformEvaluator<?> secondEvaluator =
         factory.forApplication(longs.getProducingTransformInternal(), null);
     assertThat(secondEvaluator, nullValue());
@@ -124,9 +124,9 @@ public class BoundedReadEvaluatorFactoryTest {
    */
   @Test
   public void boundedSourceEvaluatorSimultaneousEvaluations() throws Exception {
-    UncommittedBundle<Long> output = bundleFactory.createRootBundle(longs);
-    UncommittedBundle<Long> secondOutput = bundleFactory.createRootBundle(longs);
-    when(context.createRootBundle(longs)).thenReturn(output).thenReturn(secondOutput);
+    UncommittedBundle<Long> output = bundleFactory.createBundle(longs);
+    UncommittedBundle<Long> secondOutput = bundleFactory.createBundle(longs);
+    when(context.createBundle(longs)).thenReturn(output).thenReturn(secondOutput);
 
     // create both evaluators before finishing either.
     TransformEvaluator<?> evaluator =
@@ -160,8 +160,8 @@ public class BoundedReadEvaluatorFactoryTest {
     PCollection<Long> pcollection = p.apply(Read.from(source));
     AppliedPTransform<?, ?, ?> sourceTransform = pcollection.getProducingTransformInternal();
 
-    UncommittedBundle<Long> output = bundleFactory.createRootBundle(pcollection);
-    when(context.createRootBundle(pcollection)).thenReturn(output);
+    UncommittedBundle<Long> output = bundleFactory.createBundle(pcollection);
+    when(context.createBundle(pcollection)).thenReturn(output);
 
     TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, null);
     evaluator.finishBundle();
@@ -178,8 +178,8 @@ public class BoundedReadEvaluatorFactoryTest {
     PCollection<Long> pcollection = p.apply(Read.from(source));
     AppliedPTransform<?, ?, ?> sourceTransform = pcollection.getProducingTransformInternal();
 
-    UncommittedBundle<Long> output = bundleFactory.createRootBundle(pcollection);
-    when(context.createRootBundle(pcollection)).thenReturn(output);
+    UncommittedBundle<Long> output = bundleFactory.createBundle(pcollection);
+    when(context.createBundle(pcollection)).thenReturn(output);
 
     TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, null);
     evaluator.finishBundle();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/759b6cad/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
index efc6d2f..00dca20 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
@@ -63,7 +63,7 @@ public class CommittedResultTest implements Serializable {
     CommittedResult result =
         CommittedResult.create(
             StepTransformResult.withoutHold(transform).build(),
-            bundleFactory.createRootBundle(created).commit(Instant.now()),
+            bundleFactory.createBundle(created).commit(Instant.now()),
             Collections.<DirectRunner.CommittedBundle<?>>emptyList(),
             EnumSet.noneOf(OutputType.class));
 
@@ -73,7 +73,7 @@ public class CommittedResultTest implements Serializable {
   @Test
   public void getUncommittedElementsEqualInput() {
     DirectRunner.CommittedBundle<Integer> bundle =
-        bundleFactory.createRootBundle(created)
+        bundleFactory.createBundle(created)
             .add(WindowedValue.valueInGlobalWindow(2))
             .commit(Instant.now());
     CommittedResult result =
@@ -102,16 +102,16 @@ public class CommittedResultTest implements Serializable {
   @Test
   public void getOutputsEqualInput() {
     List<? extends DirectRunner.CommittedBundle<?>> outputs =
-        ImmutableList.of(bundleFactory.createRootBundle(PCollection.createPrimitiveOutputInternal(p,
+        ImmutableList.of(bundleFactory.createBundle(PCollection.createPrimitiveOutputInternal(p,
             WindowingStrategy.globalDefault(),
             PCollection.IsBounded.BOUNDED)).commit(Instant.now()),
-            bundleFactory.createRootBundle(PCollection.createPrimitiveOutputInternal(p,
+            bundleFactory.createBundle(PCollection.createPrimitiveOutputInternal(p,
                 WindowingStrategy.globalDefault(),
                 PCollection.IsBounded.UNBOUNDED)).commit(Instant.now()));
     CommittedResult result =
         CommittedResult.create(
             StepTransformResult.withoutHold(transform).build(),
-            bundleFactory.createRootBundle(created).commit(Instant.now()),
+            bundleFactory.createBundle(created).commit(Instant.now()),
             outputs,
             EnumSet.of(OutputType.BUNDLE, OutputType.PCOLLECTION_VIEW));
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/759b6cad/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EncodabilityEnforcementFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EncodabilityEnforcementFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EncodabilityEnforcementFactoryTest.java
index e0ccbe5..4da4aad 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EncodabilityEnforcementFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EncodabilityEnforcementFactoryTest.java
@@ -110,7 +110,7 @@ public class EncodabilityEnforcementFactoryTest {
     WindowedValue<Record> record = WindowedValue.<Record>valueInGlobalWindow(new Record());
 
     CommittedBundle<Record> input =
-        bundleFactory.createRootBundle(unencodable).add(record).commit(Instant.now());
+        bundleFactory.createBundle(unencodable).add(record).commit(Instant.now());
     ModelEnforcement<Record> enforcement = factory.forBundle(input, consumer);
 
     enforcement.beforeElement(record);
@@ -127,7 +127,7 @@ public class EncodabilityEnforcementFactoryTest {
     AppliedPTransform<?, ?, ?> consumer =
         unencodable.apply(Count.<T>globally()).getProducingTransformInternal();
     CommittedBundle<T> input =
-        bundleFactory.createRootBundle(unencodable).add(record).commit(Instant.now());
+        bundleFactory.createBundle(unencodable).add(record).commit(Instant.now());
     ModelEnforcement<T> enforcement = factory.forBundle(input, consumer);
     return enforcement;
   }
@@ -142,7 +142,7 @@ public class EncodabilityEnforcementFactoryTest {
     WindowedValue<Integer> value = WindowedValue.valueInGlobalWindow(1);
 
     CommittedBundle<Integer> input =
-        bundleFactory.createRootBundle(unencodable).add(value).commit(Instant.now());
+        bundleFactory.createBundle(unencodable).add(value).commit(Instant.now());
     ModelEnforcement<Integer> enforcement = factory.forBundle(input, consumer);
 
     enforcement.beforeElement(value);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/759b6cad/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
index f59dbba..bc53570 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
@@ -165,8 +165,9 @@ public class EvaluationContextTest {
     DirectStepContext stepContext = fooContext.getOrCreateStepContext("s1", "s1");
     stepContext.stateInternals().state(StateNamespaces.global(), intBag).add(1);
 
-    context.handleResult(ImmutableListBundleFactory.create()
-            .createKeyedBundle(null, StructuralKey.of("foo", StringUtf8Coder.of()), created)
+    context.handleResult(
+        ImmutableListBundleFactory.create()
+            .createKeyedBundle(StructuralKey.of("foo", StringUtf8Coder.of()), created)
             .commit(Instant.now()),
         ImmutableList.<TimerData>of(),
         StepTransformResult.withoutHold(created.getProducingTransformInternal())
@@ -264,7 +265,7 @@ public class EvaluationContextTest {
             .withAggregatorChanges(mutatorAgain)
             .build();
     context.handleResult(
-        context.createRootBundle(created).commit(Instant.now()),
+        context.createBundle(created).commit(Instant.now()),
         ImmutableList.<TimerData>of(),
         secondResult);
     assertThat((Long) context.getAggregatorContainer().getAggregate("STEP", "foo"), equalTo(16L));
@@ -291,7 +292,7 @@ public class EvaluationContextTest {
             .build();
 
     context.handleResult(
-        context.createKeyedBundle(null, myKey, created).commit(Instant.now()),
+        context.createKeyedBundle(myKey, created).commit(Instant.now()),
         ImmutableList.<TimerData>of(),
         stateResult);
 
@@ -374,7 +375,7 @@ public class EvaluationContextTest {
     // haven't added any timers, must be empty
     assertThat(context.extractFiredTimers().entrySet(), emptyIterable());
     context.handleResult(
-        context.createKeyedBundle(null, key, created).commit(Instant.now()),
+        context.createKeyedBundle(key, created).commit(Instant.now()),
         ImmutableList.<TimerData>of(),
         timerResult);
 
@@ -405,24 +406,10 @@ public class EvaluationContextTest {
   }
 
   @Test
-  public void createBundleKeyedResultPropagatesKey() {
-    StructuralKey<String> key = StructuralKey.of("foo", StringUtf8Coder.of());
-    CommittedBundle<KV<String, Integer>> newBundle =
-        context
-            .createBundle(
-                bundleFactory.createKeyedBundle(
-                    null, key,
-                    created).commit(Instant.now()),
-                downstream).commit(Instant.now());
-    assertThat(newBundle.getKey(), Matchers.<StructuralKey<?>>equalTo(key));
-  }
-
-  @Test
   public void createKeyedBundleKeyed() {
     StructuralKey<String> key = StructuralKey.of("foo", StringUtf8Coder.of());
     CommittedBundle<KV<String, Integer>> keyedBundle =
         context.createKeyedBundle(
-            bundleFactory.createRootBundle(created).commit(Instant.now()),
             key,
             downstream).commit(Instant.now());
     assertThat(keyedBundle.getKey(),
@@ -472,7 +459,7 @@ public class EvaluationContextTest {
     context.getPipelineOptions().setShutdownUnboundedProducersWithMaxWatermark(true);
     assertThat(context.isDone(), is(false));
 
-    UncommittedBundle<Integer> rootBundle = context.createRootBundle(created);
+    UncommittedBundle<Integer> rootBundle = context.createBundle(created);
     rootBundle.add(WindowedValue.valueInGlobalWindow(1));
     CommittedResult handleResult =
         context.handleResult(
@@ -514,14 +501,14 @@ public class EvaluationContextTest {
         ImmutableList.<TimerData>of(),
         StepTransformResult.withoutHold(unbounded.getProducingTransformInternal()).build());
     context.handleResult(
-        context.createRootBundle(created).commit(Instant.now()),
+        context.createBundle(created).commit(Instant.now()),
         ImmutableList.<TimerData>of(),
         StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build());
     context.extractFiredTimers();
     assertThat(context.isDone(), is(false));
 
     context.handleResult(
-        context.createRootBundle(created).commit(Instant.now()),
+        context.createBundle(created).commit(Instant.now()),
         ImmutableList.<TimerData>of(),
         StepTransformResult.withoutHold(view.getProducingTransformInternal()).build());
     context.extractFiredTimers();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/759b6cad/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java
index 3bae1ab..435fc94 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java
@@ -55,17 +55,16 @@ public class FlattenEvaluatorFactoryTest {
     PCollection<Integer> flattened = list.apply(Flatten.<Integer>pCollections());
 
     CommittedBundle<Integer> leftBundle =
-        bundleFactory.createRootBundle(left).commit(Instant.now());
+        bundleFactory.createBundle(left).commit(Instant.now());
     CommittedBundle<Integer> rightBundle =
-        bundleFactory.createRootBundle(right).commit(Instant.now());
+        bundleFactory.createBundle(right).commit(Instant.now());
 
     EvaluationContext context = mock(EvaluationContext.class);
 
-    UncommittedBundle<Integer> flattenedLeftBundle = bundleFactory.createRootBundle(flattened);
-    UncommittedBundle<Integer> flattenedRightBundle = bundleFactory.createRootBundle(flattened);
+    UncommittedBundle<Integer> flattenedLeftBundle = bundleFactory.createBundle(flattened);
+    UncommittedBundle<Integer> flattenedRightBundle = bundleFactory.createBundle(flattened);
 
-    when(context.createBundle(leftBundle, flattened)).thenReturn(flattenedLeftBundle);
-    when(context.createBundle(rightBundle, flattened)).thenReturn(flattenedRightBundle);
+    when(context.createBundle(flattened)).thenReturn(flattenedLeftBundle, flattenedRightBundle);
 
     FlattenEvaluatorFactory factory = new FlattenEvaluatorFactory(context);
     TransformEvaluator<Integer> leftSideEvaluator =

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/759b6cad/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java
index 9395017..49d7d90 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java
@@ -70,27 +70,27 @@ public class GroupByKeyEvaluatorFactoryTest {
         kvs.apply(new DirectGroupByKeyOnly<String, Integer>());
 
     CommittedBundle<KV<String, WindowedValue<Integer>>> inputBundle =
-        bundleFactory.createRootBundle(kvs).commit(Instant.now());
+        bundleFactory.createBundle(kvs).commit(Instant.now());
     EvaluationContext evaluationContext = mock(EvaluationContext.class);
     StructuralKey<String> fooKey = StructuralKey.of("foo", StringUtf8Coder.of());
     UncommittedBundle<KeyedWorkItem<String, Integer>> fooBundle =
-        bundleFactory.createKeyedBundle(null, fooKey, groupedKvs);
+        bundleFactory.createKeyedBundle(fooKey, groupedKvs);
 
     StructuralKey<String> barKey = StructuralKey.of("bar", StringUtf8Coder.of());
     UncommittedBundle<KeyedWorkItem<String, Integer>> barBundle =
-        bundleFactory.createKeyedBundle(null, barKey, groupedKvs);
+        bundleFactory.createKeyedBundle(barKey, groupedKvs);
 
     StructuralKey<String> bazKey = StructuralKey.of("baz", StringUtf8Coder.of());
     UncommittedBundle<KeyedWorkItem<String, Integer>> bazBundle =
-        bundleFactory.createKeyedBundle(null, bazKey, groupedKvs);
+        bundleFactory.createKeyedBundle(bazKey, groupedKvs);
 
-    when(evaluationContext.createKeyedBundle(inputBundle,
+    when(evaluationContext.createKeyedBundle(
         fooKey,
         groupedKvs)).thenReturn(fooBundle);
-    when(evaluationContext.createKeyedBundle(inputBundle,
+    when(evaluationContext.createKeyedBundle(
         barKey,
         groupedKvs)).thenReturn(barBundle);
-    when(evaluationContext.createKeyedBundle(inputBundle,
+    when(evaluationContext.createKeyedBundle(
         bazKey,
         groupedKvs)).thenReturn(bazBundle);
 
@@ -114,7 +114,7 @@ public class GroupByKeyEvaluatorFactoryTest {
     assertThat(
         fooBundle.commit(Instant.now()).getElements(),
         contains(
-            new KeyedWorkItemMatcher<String, Integer>(
+            new KeyedWorkItemMatcher<>(
                 KeyedWorkItems.elementsWorkItem(
                     "foo",
                     ImmutableSet.of(
@@ -125,7 +125,7 @@ public class GroupByKeyEvaluatorFactoryTest {
     assertThat(
         barBundle.commit(Instant.now()).getElements(),
         contains(
-            new KeyedWorkItemMatcher<String, Integer>(
+            new KeyedWorkItemMatcher<>(
                 KeyedWorkItems.elementsWorkItem(
                     "bar",
                     ImmutableSet.of(
@@ -135,7 +135,7 @@ public class GroupByKeyEvaluatorFactoryTest {
     assertThat(
         bazBundle.commit(Instant.now()).getElements(),
         contains(
-            new KeyedWorkItemMatcher<String, Integer>(
+            new KeyedWorkItemMatcher<>(
                 KeyedWorkItems.elementsWorkItem(
                     "baz",
                     ImmutableSet.of(WindowedValue.valueInGlobalWindow(Integer.MAX_VALUE))),
@@ -151,7 +151,7 @@ public class GroupByKeyEvaluatorFactoryTest {
     private final KeyedWorkItem<K, V> myWorkItem;
     private final Coder<K> keyCoder;
 
-    public KeyedWorkItemMatcher(KeyedWorkItem<K, V> myWorkItem, Coder<K> keyCoder) {
+    KeyedWorkItemMatcher(KeyedWorkItem<K, V> myWorkItem, Coder<K> keyCoder) {
       this.myWorkItem = myWorkItem;
       this.keyCoder = keyCoder;
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/759b6cad/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java
index 814a89a..3b9dc39 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java
@@ -70,31 +70,22 @@ public class GroupByKeyOnlyEvaluatorFactoryTest {
         kvs.apply(new DirectGroupByKeyOnly<String, Integer>());
 
     CommittedBundle<KV<String, WindowedValue<Integer>>> inputBundle =
-        bundleFactory.createRootBundle(kvs).commit(Instant.now());
+        bundleFactory.createBundle(kvs).commit(Instant.now());
     EvaluationContext evaluationContext = mock(EvaluationContext.class);
 
     StructuralKey<String> fooKey = StructuralKey.of("foo", StringUtf8Coder.of());
-    UncommittedBundle<KeyedWorkItem<String, Integer>> fooBundle = bundleFactory.createKeyedBundle(
-        null, fooKey,
-        groupedKvs);
+    UncommittedBundle<KeyedWorkItem<String, Integer>> fooBundle =
+        bundleFactory.createKeyedBundle(fooKey, groupedKvs);
     StructuralKey<String> barKey = StructuralKey.of("bar", StringUtf8Coder.of());
-    UncommittedBundle<KeyedWorkItem<String, Integer>> barBundle = bundleFactory.createKeyedBundle(
-        null, barKey,
-        groupedKvs);
+    UncommittedBundle<KeyedWorkItem<String, Integer>> barBundle =
+        bundleFactory.createKeyedBundle(barKey, groupedKvs);
     StructuralKey<String> bazKey = StructuralKey.of("baz", StringUtf8Coder.of());
-    UncommittedBundle<KeyedWorkItem<String, Integer>> bazBundle = bundleFactory.createKeyedBundle(
-        null, bazKey,
-        groupedKvs);
-
-    when(evaluationContext.createKeyedBundle(inputBundle,
-        fooKey,
-        groupedKvs)).thenReturn(fooBundle);
-    when(evaluationContext.createKeyedBundle(inputBundle,
-        barKey,
-        groupedKvs)).thenReturn(barBundle);
-    when(evaluationContext.createKeyedBundle(inputBundle,
-        bazKey,
-        groupedKvs)).thenReturn(bazBundle);
+    UncommittedBundle<KeyedWorkItem<String, Integer>> bazBundle =
+        bundleFactory.createKeyedBundle(bazKey, groupedKvs);
+
+    when(evaluationContext.createKeyedBundle(fooKey, groupedKvs)).thenReturn(fooBundle);
+    when(evaluationContext.createKeyedBundle(barKey, groupedKvs)).thenReturn(barBundle);
+    when(evaluationContext.createKeyedBundle(bazKey, groupedKvs)).thenReturn(bazBundle);
 
     // The input to a GroupByKey is assumed to be a KvCoder
     @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/759b6cad/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java
index d44151a..d445944 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java
@@ -60,8 +60,8 @@ public class ImmutabilityCheckingBundleFactoryTest {
   }
 
   @Test
-  public void noMutationRootBundleSucceeds() {
-    UncommittedBundle<byte[]> root = factory.createRootBundle(created);
+  public void rootBundleSucceeds() {
+    UncommittedBundle<byte[]> root = factory.createRootBundle();
     byte[] array = new byte[] {0, 1, 2};
     root.add(WindowedValue.valueInGlobalWindow(array));
     CommittedBundle<byte[]> committed = root.commit(Instant.now());
@@ -72,10 +72,8 @@ public class ImmutabilityCheckingBundleFactoryTest {
 
   @Test
   public void noMutationKeyedBundleSucceeds() {
-    CommittedBundle<byte[]> root = factory.createRootBundle(created).commit(Instant.now());
-    UncommittedBundle<byte[]> keyed = factory.createKeyedBundle(root,
-        StructuralKey.of("mykey", StringUtf8Coder.of()),
-        transformed);
+    UncommittedBundle<byte[]> keyed =
+        factory.createKeyedBundle(StructuralKey.of("mykey", StringUtf8Coder.of()), transformed);
 
     WindowedValue<byte[]> windowedArray =
         WindowedValue.of(
@@ -91,8 +89,7 @@ public class ImmutabilityCheckingBundleFactoryTest {
 
   @Test
   public void noMutationCreateBundleSucceeds() {
-    CommittedBundle<byte[]> root = factory.createRootBundle(created).commit(Instant.now());
-    UncommittedBundle<byte[]> intermediate = factory.createBundle(root, transformed);
+    UncommittedBundle<byte[]> intermediate = factory.createBundle(transformed);
 
     WindowedValue<byte[]> windowedArray =
         WindowedValue.of(
@@ -107,23 +104,9 @@ public class ImmutabilityCheckingBundleFactoryTest {
   }
 
   @Test
-  public void mutationBeforeAddRootBundleSucceeds() {
-    UncommittedBundle<byte[]> root = factory.createRootBundle(created);
-    byte[] array = new byte[] {0, 1, 2};
-    array[1] = 2;
-    root.add(WindowedValue.valueInGlobalWindow(array));
-    CommittedBundle<byte[]> committed = root.commit(Instant.now());
-
-    assertThat(
-        committed.getElements(), containsInAnyOrder(WindowedValue.valueInGlobalWindow(array)));
-  }
-
-  @Test
   public void mutationBeforeAddKeyedBundleSucceeds() {
-    CommittedBundle<byte[]> root = factory.createRootBundle(created).commit(Instant.now());
-    UncommittedBundle<byte[]> keyed = factory.createKeyedBundle(root,
-        StructuralKey.of("mykey", StringUtf8Coder.of()),
-        transformed);
+    UncommittedBundle<byte[]> keyed =
+        factory.createKeyedBundle(StructuralKey.of("mykey", StringUtf8Coder.of()), transformed);
 
     byte[] array = new byte[] {4, 8, 12};
     array[0] = Byte.MAX_VALUE;
@@ -141,8 +124,7 @@ public class ImmutabilityCheckingBundleFactoryTest {
 
   @Test
   public void mutationBeforeAddCreateBundleSucceeds() {
-    CommittedBundle<byte[]> root = factory.createRootBundle(created).commit(Instant.now());
-    UncommittedBundle<byte[]> intermediate = factory.createBundle(root, transformed);
+    UncommittedBundle<byte[]> intermediate = factory.createBundle(transformed);
 
     byte[] array = new byte[] {4, 8, 12};
     WindowedValue<byte[]> windowedArray =
@@ -159,23 +141,9 @@ public class ImmutabilityCheckingBundleFactoryTest {
   }
 
   @Test
-  public void mutationAfterAddRootBundleThrows() {
-    UncommittedBundle<byte[]> root = factory.createRootBundle(created);
-    byte[] array = new byte[] {0, 1, 2};
-    root.add(WindowedValue.valueInGlobalWindow(array));
-
-    array[1] = 2;
-    thrown.expect(IllegalMutationException.class);
-    thrown.expectMessage("Values must not be mutated in any way after being output");
-    CommittedBundle<byte[]> committed = root.commit(Instant.now());
-  }
-
-  @Test
   public void mutationAfterAddKeyedBundleThrows() {
-    CommittedBundle<byte[]> root = factory.createRootBundle(created).commit(Instant.now());
-    UncommittedBundle<byte[]> keyed = factory.createKeyedBundle(root,
-        StructuralKey.of("mykey", StringUtf8Coder.of()),
-        transformed);
+    UncommittedBundle<byte[]> keyed =
+        factory.createKeyedBundle(StructuralKey.of("mykey", StringUtf8Coder.of()), transformed);
 
     byte[] array = new byte[] {4, 8, 12};
     WindowedValue<byte[]> windowedArray =
@@ -189,13 +157,12 @@ public class ImmutabilityCheckingBundleFactoryTest {
     array[0] = Byte.MAX_VALUE;
     thrown.expect(IllegalMutationException.class);
     thrown.expectMessage("Values must not be mutated in any way after being output");
-    CommittedBundle<byte[]> committed = keyed.commit(Instant.now());
+    keyed.commit(Instant.now());
   }
 
   @Test
   public void mutationAfterAddCreateBundleThrows() {
-    CommittedBundle<byte[]> root = factory.createRootBundle(created).commit(Instant.now());
-    UncommittedBundle<byte[]> intermediate = factory.createBundle(root, transformed);
+    UncommittedBundle<byte[]> intermediate = factory.createBundle(transformed);
 
     byte[] array = new byte[] {4, 8, 12};
     WindowedValue<byte[]> windowedArray =
@@ -209,7 +176,7 @@ public class ImmutabilityCheckingBundleFactoryTest {
     array[2] = -3;
     thrown.expect(IllegalMutationException.class);
     thrown.expectMessage("Values must not be mutated in any way after being output");
-    CommittedBundle<byte[]> committed = intermediate.commit(Instant.now());
+    intermediate.commit(Instant.now());
   }
 
   private static class IdentityDoFn<T> extends OldDoFn<T, T> {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/759b6cad/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java
index 713ae35..812d7d5 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java
@@ -71,7 +71,7 @@ public class ImmutabilityEnforcementFactoryTest implements Serializable {
   public void unchangedSucceeds() {
     WindowedValue<byte[]> element = WindowedValue.valueInGlobalWindow("bar".getBytes());
     CommittedBundle<byte[]> elements =
-        bundleFactory.createRootBundle(pcollection).add(element).commit(Instant.now());
+        bundleFactory.createBundle(pcollection).add(element).commit(Instant.now());
 
     ModelEnforcement<byte[]> enforcement = factory.forBundle(elements, consumer);
     enforcement.beforeElement(element);
@@ -86,7 +86,7 @@ public class ImmutabilityEnforcementFactoryTest implements Serializable {
   public void mutatedDuringProcessElementThrows() {
     WindowedValue<byte[]> element = WindowedValue.valueInGlobalWindow("bar".getBytes());
     CommittedBundle<byte[]> elements =
-        bundleFactory.createRootBundle(pcollection).add(element).commit(Instant.now());
+        bundleFactory.createBundle(pcollection).add(element).commit(Instant.now());
 
     ModelEnforcement<byte[]> enforcement = factory.forBundle(elements, consumer);
     enforcement.beforeElement(element);
@@ -107,7 +107,7 @@ public class ImmutabilityEnforcementFactoryTest implements Serializable {
 
     WindowedValue<byte[]> element = WindowedValue.valueInGlobalWindow("bar".getBytes());
     CommittedBundle<byte[]> elements =
-        bundleFactory.createRootBundle(pcollection).add(element).commit(Instant.now());
+        bundleFactory.createBundle(pcollection).add(element).commit(Instant.now());
 
     ModelEnforcement<byte[]> enforcement = factory.forBundle(elements, consumer);
     enforcement.beforeElement(element);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/759b6cad/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java
index 43108f8..4a7477f 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java
@@ -69,27 +69,14 @@ public class ImmutableListBundleFactoryTest {
     downstream = created.apply(WithKeys.<String, Integer>of("foo"));
   }
 
-  @Test
-  public void createRootBundleShouldCreateWithEmptyKey() {
-    PCollection<Integer> pcollection = TestPipeline.create().apply(Create.of(1));
-
-    UncommittedBundle<Integer> inFlightBundle = bundleFactory.createRootBundle(pcollection);
-
-    CommittedBundle<Integer> bundle = inFlightBundle.commit(Instant.now());
-
-    assertThat(bundle.getKey(),
-        Matchers.<StructuralKey<?>>equalTo(StructuralKey.of(null, VoidCoder.of())));
-  }
-
   private <T> void createKeyedBundle(Coder<T> coder, T key) throws Exception {
     PCollection<Integer> pcollection = TestPipeline.create().apply(Create.of(1));
-    StructuralKey skey = StructuralKey.of(key, coder);
+    StructuralKey<?> skey = StructuralKey.of(key, coder);
 
-    UncommittedBundle<Integer> inFlightBundle =
-        bundleFactory.createKeyedBundle(null, skey, pcollection);
+    UncommittedBundle<Integer> inFlightBundle = bundleFactory.createKeyedBundle(skey, pcollection);
 
     CommittedBundle<Integer> bundle = inFlightBundle.commit(Instant.now());
-    assertThat(bundle.getKey(), equalTo(skey));
+    assertThat(bundle.getKey(), Matchers.<StructuralKey<?>>equalTo(skey));
   }
 
   @Test
@@ -106,9 +93,7 @@ public class ImmutableListBundleFactoryTest {
 
   private <T> CommittedBundle<T>
   afterCommitGetElementsShouldHaveAddedElements(Iterable<WindowedValue<T>> elems) {
-    PCollection<T> pcollection = TestPipeline.create().apply(Create.<T>of());
-
-    UncommittedBundle<T> bundle = bundleFactory.createRootBundle(pcollection);
+    UncommittedBundle<T> bundle = bundleFactory.createRootBundle();
     Collection<Matcher<? super WindowedValue<T>>> expectations = new ArrayList<>();
     for (WindowedValue<T> elem : elems) {
       bundle.add(elem);
@@ -168,9 +153,7 @@ public class ImmutableListBundleFactoryTest {
 
   @Test
   public void addAfterCommitShouldThrowException() {
-    PCollection<Integer> pcollection = TestPipeline.create().apply(Create.<Integer>of());
-
-    UncommittedBundle<Integer> bundle = bundleFactory.createRootBundle(pcollection);
+    UncommittedBundle<Integer> bundle = bundleFactory.createRootBundle();
     bundle.add(WindowedValue.valueInGlobalWindow(1));
     CommittedBundle<Integer> firstCommit = bundle.commit(Instant.now());
     assertThat(firstCommit.getElements(), containsInAnyOrder(WindowedValue.valueInGlobalWindow(1)));
@@ -184,9 +167,7 @@ public class ImmutableListBundleFactoryTest {
 
   @Test
   public void commitAfterCommitShouldThrowException() {
-    PCollection<Integer> pcollection = TestPipeline.create().apply(Create.<Integer>of());
-
-    UncommittedBundle<Integer> bundle = bundleFactory.createRootBundle(pcollection);
+    UncommittedBundle<Integer> bundle = bundleFactory.createRootBundle();
     bundle.add(WindowedValue.valueInGlobalWindow(1));
     CommittedBundle<Integer> firstCommit = bundle.commit(Instant.now());
     assertThat(firstCommit.getElements(), containsInAnyOrder(WindowedValue.valueInGlobalWindow(1)));
@@ -198,29 +179,8 @@ public class ImmutableListBundleFactoryTest {
   }
 
   @Test
-  public void createBundleUnkeyedResultUnkeyed() {
-    CommittedBundle<KV<String, Integer>> newBundle =
-        bundleFactory
-            .createBundle(bundleFactory.createRootBundle(created).commit(Instant.now()), downstream)
-            .commit(Instant.now());
-  }
-
-  @Test
-  public void createBundleKeyedResultPropagatesKey() {
-    CommittedBundle<KV<String, Integer>> newBundle =
-        bundleFactory.createBundle(
-            bundleFactory.createKeyedBundle(
-                null,
-                StructuralKey.of("foo", StringUtf8Coder.of()),
-                created).commit(Instant.now()),
-            downstream).commit(Instant.now());
-    assertThat(newBundle.getKey().getKey(), Matchers.<Object>equalTo("foo"));
-  }
-
-  @Test
   public void createKeyedBundleKeyed() {
     CommittedBundle<KV<String, Integer>> keyedBundle = bundleFactory.createKeyedBundle(
-        bundleFactory.createRootBundle(created).commit(Instant.now()),
         StructuralKey.of("foo", StringUtf8Coder.of()),
         downstream).commit(Instant.now());
     assertThat(keyedBundle.getKey().getKey(), Matchers.<Object>equalTo("foo"));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/759b6cad/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
index 2a54ecb..1a742f0 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
@@ -94,10 +94,9 @@ public class ParDoEvaluatorTest {
     PCollection<Integer> output = inputPc.apply(ParDo.of(fn).withSideInputs(singletonView));
 
     CommittedBundle<Integer> inputBundle =
-        bundleFactory.createRootBundle(inputPc).commit(Instant.now());
-    UncommittedBundle<Integer> outputBundle = bundleFactory.createBundle(inputBundle, output);
-    when(evaluationContext.createBundle(inputBundle, output))
-        .thenReturn(outputBundle);
+        bundleFactory.createBundle(inputPc).commit(Instant.now());
+    UncommittedBundle<Integer> outputBundle = bundleFactory.createBundle(output);
+    when(evaluationContext.createBundle(output)).thenReturn(outputBundle);
 
     ParDoEvaluator<Integer> evaluator =
         createEvaluator(singletonView, fn, inputBundle, output);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/759b6cad/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java
index 94b7f5d..88e1484 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java
@@ -92,7 +92,7 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable {
     PCollectionTuple outputTuple = input.apply(pardo);
 
     CommittedBundle<String> inputBundle =
-        bundleFactory.createRootBundle(input).commit(Instant.now());
+        bundleFactory.createBundle(input).commit(Instant.now());
 
     PCollection<KV<String, Integer>> mainOutput = outputTuple.get(mainOutputTag);
     PCollection<String> elementOutput = outputTuple.get(elementTag);
@@ -100,14 +100,13 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable {
 
     EvaluationContext evaluationContext = mock(EvaluationContext.class);
     UncommittedBundle<KV<String, Integer>> mainOutputBundle =
-        bundleFactory.createRootBundle(mainOutput);
-    UncommittedBundle<String> elementOutputBundle = bundleFactory.createRootBundle(elementOutput);
-    UncommittedBundle<Integer> lengthOutputBundle = bundleFactory.createRootBundle(lengthOutput);
+        bundleFactory.createBundle(mainOutput);
+    UncommittedBundle<String> elementOutputBundle = bundleFactory.createBundle(elementOutput);
+    UncommittedBundle<Integer> lengthOutputBundle = bundleFactory.createBundle(lengthOutput);
 
-    when(evaluationContext.createBundle(inputBundle, mainOutput)).thenReturn(mainOutputBundle);
-    when(evaluationContext.createBundle(inputBundle, elementOutput))
-        .thenReturn(elementOutputBundle);
-    when(evaluationContext.createBundle(inputBundle, lengthOutput)).thenReturn(lengthOutputBundle);
+    when(evaluationContext.createBundle(mainOutput)).thenReturn(mainOutputBundle);
+    when(evaluationContext.createBundle(elementOutput)).thenReturn(elementOutputBundle);
+    when(evaluationContext.createBundle(lengthOutput)).thenReturn(lengthOutputBundle);
 
     DirectExecutionContext executionContext =
         new DirectExecutionContext(null, null, null, null);
@@ -182,19 +181,18 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable {
     PCollectionTuple outputTuple = input.apply(pardo);
 
     CommittedBundle<String> inputBundle =
-        bundleFactory.createRootBundle(input).commit(Instant.now());
+        bundleFactory.createBundle(input).commit(Instant.now());
 
     PCollection<KV<String, Integer>> mainOutput = outputTuple.get(mainOutputTag);
     PCollection<String> elementOutput = outputTuple.get(elementTag);
 
     EvaluationContext evaluationContext = mock(EvaluationContext.class);
     UncommittedBundle<KV<String, Integer>> mainOutputBundle =
-        bundleFactory.createRootBundle(mainOutput);
-    UncommittedBundle<String> elementOutputBundle = bundleFactory.createRootBundle(elementOutput);
+        bundleFactory.createBundle(mainOutput);
+    UncommittedBundle<String> elementOutputBundle = bundleFactory.createBundle(elementOutput);
 
-    when(evaluationContext.createBundle(inputBundle, mainOutput)).thenReturn(mainOutputBundle);
-    when(evaluationContext.createBundle(inputBundle, elementOutput))
-        .thenReturn(elementOutputBundle);
+    when(evaluationContext.createBundle(mainOutput)).thenReturn(mainOutputBundle);
+    when(evaluationContext.createBundle(elementOutput)).thenReturn(elementOutputBundle);
 
     DirectExecutionContext executionContext =
         new DirectExecutionContext(null, null, null, null);
@@ -274,19 +272,18 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable {
     PCollectionTuple outputTuple = input.apply(pardo);
 
     CommittedBundle<String> inputBundle =
-        bundleFactory.createRootBundle(input).commit(Instant.now());
+        bundleFactory.createBundle(input).commit(Instant.now());
 
     PCollection<KV<String, Integer>> mainOutput = outputTuple.get(mainOutputTag);
     PCollection<String> elementOutput = outputTuple.get(elementTag);
 
     EvaluationContext evaluationContext = mock(EvaluationContext.class);
     UncommittedBundle<KV<String, Integer>> mainOutputBundle =
-        bundleFactory.createRootBundle(mainOutput);
-    UncommittedBundle<String> elementOutputBundle = bundleFactory.createRootBundle(elementOutput);
+        bundleFactory.createBundle(mainOutput);
+    UncommittedBundle<String> elementOutputBundle = bundleFactory.createBundle(elementOutput);
 
-    when(evaluationContext.createBundle(inputBundle, mainOutput)).thenReturn(mainOutputBundle);
-    when(evaluationContext.createBundle(inputBundle, elementOutput))
-        .thenReturn(elementOutputBundle);
+    when(evaluationContext.createBundle(mainOutput)).thenReturn(mainOutputBundle);
+    when(evaluationContext.createBundle(elementOutput)).thenReturn(elementOutputBundle);
 
     DirectExecutionContext executionContext = new DirectExecutionContext(null,
         StructuralKey.of("myKey", StringUtf8Coder.of()),
@@ -388,19 +385,18 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable {
     PCollectionTuple outputTuple = input.apply(pardo);
 
     CommittedBundle<String> inputBundle =
-        bundleFactory.createRootBundle(input).commit(Instant.now());
+        bundleFactory.createBundle(input).commit(Instant.now());
 
     PCollection<KV<String, Integer>> mainOutput = outputTuple.get(mainOutputTag);
     PCollection<String> elementOutput = outputTuple.get(elementTag);
 
     EvaluationContext evaluationContext = mock(EvaluationContext.class);
     UncommittedBundle<KV<String, Integer>> mainOutputBundle =
-        bundleFactory.createRootBundle(mainOutput);
-    UncommittedBundle<String> elementOutputBundle = bundleFactory.createRootBundle(elementOutput);
+        bundleFactory.createBundle(mainOutput);
+    UncommittedBundle<String> elementOutputBundle = bundleFactory.createBundle(elementOutput);
 
-    when(evaluationContext.createBundle(inputBundle, mainOutput)).thenReturn(mainOutputBundle);
-    when(evaluationContext.createBundle(inputBundle, elementOutput))
-        .thenReturn(elementOutputBundle);
+    when(evaluationContext.createBundle(mainOutput)).thenReturn(mainOutputBundle);
+    when(evaluationContext.createBundle(elementOutput)).thenReturn(elementOutputBundle);
 
     DirectExecutionContext executionContext = new DirectExecutionContext(null,
         StructuralKey.of("myKey", StringUtf8Coder.of()),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/759b6cad/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java
index 7207b99..6a02e40 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java
@@ -80,11 +80,11 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable {
                   }
                 }));
     CommittedBundle<String> inputBundle =
-        bundleFactory.createRootBundle(input).commit(Instant.now());
+        bundleFactory.createBundle(input).commit(Instant.now());
 
     EvaluationContext evaluationContext = mock(EvaluationContext.class);
-    UncommittedBundle<Integer> outputBundle = bundleFactory.createRootBundle(collection);
-    when(evaluationContext.createBundle(inputBundle, collection)).thenReturn(outputBundle);
+    UncommittedBundle<Integer> outputBundle = bundleFactory.createBundle(collection);
+    when(evaluationContext.createBundle(collection)).thenReturn(outputBundle);
     DirectExecutionContext executionContext =
         new DirectExecutionContext(null, null, null, null);
     when(evaluationContext.getExecutionContext(collection.getProducingTransformInternal(),
@@ -134,11 +134,11 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable {
                   }
                 }));
     CommittedBundle<String> inputBundle =
-        bundleFactory.createRootBundle(input).commit(Instant.now());
+        bundleFactory.createBundle(input).commit(Instant.now());
 
     EvaluationContext evaluationContext = mock(EvaluationContext.class);
-    UncommittedBundle<Integer> outputBundle = bundleFactory.createRootBundle(collection);
-    when(evaluationContext.createBundle(inputBundle, collection)).thenReturn(outputBundle);
+    UncommittedBundle<Integer> outputBundle = bundleFactory.createBundle(collection);
+    when(evaluationContext.createBundle(collection)).thenReturn(outputBundle);
     DirectExecutionContext executionContext =
         new DirectExecutionContext(null, null, null, null);
     when(evaluationContext.getExecutionContext(collection.getProducingTransformInternal(),
@@ -197,13 +197,13 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable {
     PCollection<KV<String, Integer>> mainOutput = input.apply(pardo);
 
     CommittedBundle<String> inputBundle =
-        bundleFactory.createRootBundle(input).commit(Instant.now());
+        bundleFactory.createBundle(input).commit(Instant.now());
 
     EvaluationContext evaluationContext = mock(EvaluationContext.class);
     UncommittedBundle<KV<String, Integer>> mainOutputBundle =
-        bundleFactory.createRootBundle(mainOutput);
+        bundleFactory.createBundle(mainOutput);
 
-    when(evaluationContext.createBundle(inputBundle, mainOutput)).thenReturn(mainOutputBundle);
+    when(evaluationContext.createBundle(mainOutput)).thenReturn(mainOutputBundle);
 
     DirectExecutionContext executionContext = new DirectExecutionContext(null,
         StructuralKey.of("myKey", StringUtf8Coder.of()),
@@ -299,13 +299,13 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable {
 
     StructuralKey<?> key = StructuralKey.of("myKey", StringUtf8Coder.of());
     CommittedBundle<String> inputBundle =
-        bundleFactory.createRootBundle(input).commit(Instant.now());
+        bundleFactory.createBundle(input).commit(Instant.now());
 
     EvaluationContext evaluationContext = mock(EvaluationContext.class);
     UncommittedBundle<KV<String, Integer>> mainOutputBundle =
-        bundleFactory.createRootBundle(mainOutput);
+        bundleFactory.createBundle(mainOutput);
 
-    when(evaluationContext.createBundle(inputBundle, mainOutput)).thenReturn(mainOutputBundle);
+    when(evaluationContext.createBundle(mainOutput)).thenReturn(mainOutputBundle);
 
     DirectExecutionContext executionContext = new DirectExecutionContext(null,
         key,

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/759b6cad/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StepTransformResultTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StepTransformResultTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StepTransformResultTest.java
index c06eff9..61f5812 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StepTransformResultTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StepTransformResultTest.java
@@ -55,7 +55,7 @@ public class StepTransformResultTest {
 
   @Test
   public void producedBundlesProducedOutputs() {
-    UncommittedBundle<Integer> bundle = bundleFactory.createRootBundle(pc);
+    UncommittedBundle<Integer> bundle = bundleFactory.createBundle(pc);
     TransformResult result = StepTransformResult.withoutHold(transform).addOutput(bundle)
         .build();
 
@@ -74,7 +74,7 @@ public class StepTransformResultTest {
   @Test
   public void producedBundlesAndAdditionalOutputProducedOutputs() {
     TransformResult result = StepTransformResult.withoutHold(transform)
-        .addOutput(bundleFactory.createRootBundle(pc))
+        .addOutput(bundleFactory.createBundle(pc))
         .withAdditionalOutput(OutputType.PCOLLECTION_VIEW)
         .build();
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/759b6cad/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StructuralKeyTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StructuralKeyTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StructuralKeyTest.java
index 18aeac6..9840c72 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StructuralKeyTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StructuralKeyTest.java
@@ -27,6 +27,7 @@ import static org.junit.Assert.assertThat;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
+import org.hamcrest.Matchers;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -53,6 +54,14 @@ public class StructuralKeyTest {
   }
 
   @Test
+  public void emptyKeysNotEqual() {
+    StructuralKey<?> empty = StructuralKey.empty();
+
+    assertThat(empty, not(Matchers.<StructuralKey<?>>equalTo(StructuralKey.empty())));
+    assertThat(empty, Matchers.<StructuralKey<?>>equalTo(empty));
+  }
+
+  @Test
   public void objectEqualsTrueKeyEquals() {
     StructuralKey<Integer> original = StructuralKey.of(1234, VarIntCoder.of());
     assertThat(StructuralKey.of(1234, VarIntCoder.of()), equalTo(original));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/759b6cad/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java
index 7413b25..a8cd8d7 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java
@@ -65,9 +65,8 @@ public class TestStreamEvaluatorFactoryTest {
                 .addElements(4, 5, 6)
                 .advanceWatermarkToInfinity());
 
-    when(context.createRootBundle(streamVals))
-        .thenReturn(
-            bundleFactory.createRootBundle(streamVals), bundleFactory.createRootBundle(streamVals));
+    when(context.createBundle(streamVals))
+        .thenReturn(bundleFactory.createBundle(streamVals), bundleFactory.createBundle(streamVals));
 
     TransformEvaluator<Object> firstEvaluator =
         factory.forApplication(streamVals.getProducingTransformInternal(), null);
@@ -134,9 +133,8 @@ public class TestStreamEvaluatorFactoryTest {
     PCollection<Integer> firstVals = p.apply("Stream One", stream);
     PCollection<Integer> secondVals = p.apply("Stream A", stream);
 
-    when(context.createRootBundle(firstVals)).thenReturn(bundleFactory.createRootBundle(firstVals));
-    when(context.createRootBundle(secondVals))
-        .thenReturn(bundleFactory.createRootBundle(secondVals));
+    when(context.createBundle(firstVals)).thenReturn(bundleFactory.createBundle(firstVals));
+    when(context.createBundle(secondVals)).thenReturn(bundleFactory.createBundle(secondVals));
 
     TransformEvaluator<Object> firstEvaluator =
         factory.forApplication(firstVals.getProducingTransformInternal(), null);
@@ -181,9 +179,8 @@ public class TestStreamEvaluatorFactoryTest {
                 .addElements("Two")
                 .advanceWatermarkToInfinity());
 
-    when(context.createRootBundle(firstVals)).thenReturn(bundleFactory.createRootBundle(firstVals));
-    when(context.createRootBundle(secondVals))
-        .thenReturn(bundleFactory.createRootBundle(secondVals));
+    when(context.createBundle(firstVals)).thenReturn(bundleFactory.createBundle(firstVals));
+    when(context.createBundle(secondVals)).thenReturn(bundleFactory.createBundle(secondVals));
 
     TransformEvaluator<Object> firstEvaluator =
         factory.forApplication(firstVals.getProducingTransformInternal(), null);