You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bc...@apache.org on 2016/04/05 00:58:31 UTC

[2/3] incubator-beam git commit: Add BundleFactory, ImmutabilityCheckingBundleFactory

Add BundleFactory, ImmutabilityCheckingBundleFactory

This allows checks to be made on the contents of bundles.
ImmutabilityCheckingBundleFactory produces bundles that ensure that
elements output to a bundle are not modified after being output.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/334ab99a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/334ab99a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/334ab99a

Branch: refs/heads/master
Commit: 334ab99ab39b7f0632848b789e2c0af1782b11c0
Parents: ac314ee
Author: Thomas Groh <tg...@google.com>
Authored: Thu Mar 17 17:39:45 2016 -0700
Committer: bchambers <bc...@google.com>
Committed: Mon Apr 4 15:44:26 2016 -0700

----------------------------------------------------------------------
 .../sdk/runners/inprocess/BundleFactory.java    |  50 +++++
 .../ExecutorServiceParallelExecutor.java        |   5 +-
 .../ImmutabilityCheckingBundleFactory.java      | 131 +++++++++++
 .../inprocess/InProcessBundleFactory.java       | 157 +++++++++++++
 .../inprocess/InProcessEvaluationContext.java   |  18 +-
 .../inprocess/InProcessPipelineRunner.java      |   5 +
 .../BoundedReadEvaluatorFactoryTest.java        |  21 +-
 .../inprocess/FlattenEvaluatorFactoryTest.java  |  11 +-
 .../GroupByKeyEvaluatorFactoryTest.java         |  10 +-
 .../ImmutabilityCheckingBundleFactoryTest.java  | 220 +++++++++++++++++++
 .../inprocess/InMemoryWatermarkManagerTest.java |  49 +++--
 .../inprocess/InProcessBundleFactoryTest.java   | 197 +++++++++++++++++
 .../InProcessEvaluationContextTest.java         |  11 +-
 .../ParDoMultiEvaluatorFactoryTest.java         |  95 ++++----
 .../ParDoSingleEvaluatorFactoryTest.java        | 129 ++++++-----
 .../inprocess/TransformExecutorTest.java        |  10 +-
 .../UnboundedReadEvaluatorFactoryTest.java      |  10 +-
 .../inprocess/ViewEvaluatorFactoryTest.java     |   5 +-
 18 files changed, 980 insertions(+), 154 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/334ab99a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BundleFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BundleFactory.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BundleFactory.java
new file mode 100644
index 0000000..cb8a369
--- /dev/null
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BundleFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.runners.inprocess;
+
+import com.google.cloud.dataflow.sdk.runners.inprocess.GroupByKeyEvaluatorFactory.InProcessGroupByKeyOnly;
+import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
+import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+
+/**
+ * A factory that creates {@link UncommittedBundle UncommittedBundles}.
+ */
+public interface BundleFactory {
+  /**
+   * Create an {@link UncommittedBundle} from an empty input. Elements added to the bundle belong to
+   * the {@code output} {@link PCollection}.
+   */
+  public <T> UncommittedBundle<T> createRootBundle(PCollection<T> output);
+
+  /**
+   * Create an {@link UncommittedBundle} from the specified input. Elements added to the bundle
+   * belong to the {@code output} {@link PCollection}.
+   */
+  public <T> UncommittedBundle<T> createBundle(CommittedBundle<?> input, PCollection<T> output);
+
+  /**
+   * Create an {@link UncommittedBundle} with the specified keys at the specified step. For use by
+   * {@link InProcessGroupByKeyOnly} {@link PTransform PTransforms}. Elements added to the bundle
+   * belong to the {@code output} {@link PCollection}.
+   */
+  public <T> UncommittedBundle<T> createKeyedBundle(
+      CommittedBundle<?> input, Object key, PCollection<T> output);
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/334ab99a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java
index 628f107..9af6f97 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java
@@ -374,8 +374,9 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor {
                   KeyedWorkItems.timersWorkItem(keyTimers.getKey(), delivery);
               @SuppressWarnings({"unchecked", "rawtypes"})
               CommittedBundle<?> bundle =
-                  InProcessBundle.<KeyedWorkItem<Object, Object>>keyed(
-                          (PCollection) transform.getInput(), keyTimers.getKey())
+                  evaluationContext
+                      .createKeyedBundle(
+                          null, keyTimers.getKey(), (PCollection) transform.getInput())
                       .add(WindowedValue.valueInEmptyWindows(work))
                       .commit(Instant.now());
               scheduleConsumption(transform, bundle, new TimerCompletionCallback(delivery));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/334ab99a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityCheckingBundleFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityCheckingBundleFactory.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityCheckingBundleFactory.java
new file mode 100644
index 0000000..44670e8
--- /dev/null
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityCheckingBundleFactory.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.runners.inprocess;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.api.client.util.Throwables;
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.coders.CoderException;
+import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
+import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.util.IllegalMutationException;
+import com.google.cloud.dataflow.sdk.util.MutationDetector;
+import com.google.cloud.dataflow.sdk.util.MutationDetectors;
+import com.google.cloud.dataflow.sdk.util.SerializableUtils;
+import com.google.cloud.dataflow.sdk.util.UserCodeException;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.SetMultimap;
+
+import org.joda.time.Instant;
+
+/**
+ * A {@link BundleFactory} that ensures that elements added to it are not mutated after being
+ * output. Immutability checks are enforced at the time {@link UncommittedBundle#commit(Instant)} is
+ * called, checking the value at that time against the value at the time the element was added. All
+ * elements added to the bundle will be encoded by the {@link Coder} of the underlying
+ * {@link PCollection}.
+ *
+ * <p>This catches errors during the execution of a {@link DoFn} caused by modifying an element
+ * after it is added to an output {@link PCollection}.
+ */
+class ImmutabilityCheckingBundleFactory implements BundleFactory {
+  /**
+   * Create a new {@link ImmutabilityCheckingBundleFactory} that uses the underlying
+   * {@link BundleFactory} to create the output bundle.
+   */
+  public static ImmutabilityCheckingBundleFactory create(BundleFactory underlying) {
+    return new ImmutabilityCheckingBundleFactory(underlying);
+  }
+
+  private final BundleFactory underlying;
+
+  private ImmutabilityCheckingBundleFactory(BundleFactory underlying) {
+    this.underlying = checkNotNull(underlying);
+  }
+
+  @Override
+  public <T> UncommittedBundle<T> createRootBundle(PCollection<T> output) {
+    return new ImmutabilityEnforcingBundle<>(underlying.createRootBundle(output));
+  }
+
+  @Override
+  public <T> UncommittedBundle<T> createBundle(CommittedBundle<?> input, PCollection<T> output) {
+    return new ImmutabilityEnforcingBundle<>(underlying.createBundle(input, output));
+  }
+
+  @Override
+  public <T> UncommittedBundle<T> createKeyedBundle(
+      CommittedBundle<?> input, Object key, PCollection<T> output) {
+    return new ImmutabilityEnforcingBundle<>(underlying.createKeyedBundle(input, key, output));
+  }
+
+  private static class ImmutabilityEnforcingBundle<T> implements UncommittedBundle<T> {
+    private final UncommittedBundle<T> underlying;
+    private final SetMultimap<WindowedValue<T>, MutationDetector> mutationDetectors;
+    private Coder<T> coder;
+
+    public ImmutabilityEnforcingBundle(UncommittedBundle<T> underlying) {
+      this.underlying = underlying;
+      mutationDetectors = HashMultimap.create();
+      coder = SerializableUtils.clone(getPCollection().getCoder());
+    }
+
+    @Override
+    public PCollection<T> getPCollection() {
+      return underlying.getPCollection();
+    }
+
+    @Override
+    public UncommittedBundle<T> add(WindowedValue<T> element) {
+      try {
+        mutationDetectors.put(
+            element, MutationDetectors.forValueWithCoder(element.getValue(), coder));
+      } catch (CoderException e) {
+        throw Throwables.propagate(e);
+      }
+      underlying.add(element);
+      return this;
+    }
+
+    @Override
+    public CommittedBundle<T> commit(Instant synchronizedProcessingTime) {
+      for (MutationDetector detector : mutationDetectors.values()) {
+        try {
+          detector.verifyUnmodified();
+        } catch (IllegalMutationException exn) {
+          throw UserCodeException.wrap(
+              new IllegalMutationException(
+                  String.format(
+                      "PTransform %s mutated value %s after it was output (new value was %s)."
+                          + " Values must not be mutated in any way after being output.",
+                      underlying.getPCollection().getProducingTransformInternal().getFullName(),
+                      exn.getSavedValue(),
+                      exn.getNewValue()),
+                  exn.getSavedValue(),
+                  exn.getNewValue(),
+                  exn));
+        }
+      }
+      return underlying.commit(synchronizedProcessingTime);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/334ab99a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundleFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundleFactory.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundleFactory.java
new file mode 100644
index 0000000..7ca1b60
--- /dev/null
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundleFactory.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.runners.inprocess;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
+import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableList;
+
+import org.joda.time.Instant;
+
+import javax.annotation.Nullable;
+
+/**
+ * A factory that produces bundles that perform no additional validation.
+ */
+class InProcessBundleFactory implements BundleFactory {
+  public static InProcessBundleFactory create() {
+    return new InProcessBundleFactory();
+  }
+
+  private InProcessBundleFactory() {}
+
+  @Override
+  public <T> UncommittedBundle<T> createRootBundle(PCollection<T> output) {
+    return InProcessBundle.unkeyed(output);
+  }
+
+  @Override
+  public <T> UncommittedBundle<T> createBundle(CommittedBundle<?> input, PCollection<T> output) {
+    return input.isKeyed()
+        ? InProcessBundle.keyed(output, input.getKey())
+        : InProcessBundle.unkeyed(output);
+  }
+
+  @Override
+  public <T> UncommittedBundle<T> createKeyedBundle(
+      CommittedBundle<?> input, Object key, PCollection<T> output) {
+    return InProcessBundle.keyed(output, key);
+  }
+
+  /**
+   * A {@link UncommittedBundle} that buffers elements in memory.
+   */
+  private static final class InProcessBundle<T> implements UncommittedBundle<T> {
+    private final PCollection<T> pcollection;
+    private final boolean keyed;
+    private final Object key;
+    private boolean committed = false;
+    private ImmutableList.Builder<WindowedValue<T>> elements;
+
+    /**
+     * Create a new {@link InProcessBundle} for the specified {@link PCollection} without a key.
+     */
+    public static <T> InProcessBundle<T> unkeyed(PCollection<T> pcollection) {
+      return new InProcessBundle<T>(pcollection, false, null);
+    }
+
+    /**
+     * Create a new {@link InProcessBundle} for the specified {@link PCollection} with the specified
+     * key.
+     *
+     * <p>See {@link CommittedBundle#getKey()} and {@link CommittedBundle#isKeyed()} for more
+     * information.
+     */
+    public static <T> InProcessBundle<T> keyed(PCollection<T> pcollection, Object key) {
+      return new InProcessBundle<T>(pcollection, true, key);
+    }
+
+    private InProcessBundle(PCollection<T> pcollection, boolean keyed, Object key) {
+      this.pcollection = pcollection;
+      this.keyed = keyed;
+      this.key = key;
+      this.elements = ImmutableList.builder();
+    }
+
+    @Override
+    public PCollection<T> getPCollection() {
+      return pcollection;
+    }
+
+    @Override
+    public InProcessBundle<T> add(WindowedValue<T> element) {
+      checkState(
+          !committed,
+          "Can't add element %s to committed bundle in PCollection %s",
+          element,
+          pcollection);
+      elements.add(element);
+      return this;
+    }
+
+    @Override
+    public CommittedBundle<T> commit(final Instant synchronizedCompletionTime) {
+      checkState(!committed, "Can't commit already committed bundle %s", this);
+      committed = true;
+      final Iterable<WindowedValue<T>> committedElements = elements.build();
+      return new CommittedBundle<T>() {
+        @Override
+        @Nullable
+        public Object getKey() {
+          return key;
+        }
+
+        @Override
+        public boolean isKeyed() {
+          return keyed;
+        }
+
+        @Override
+        public Iterable<WindowedValue<T>> getElements() {
+          return committedElements;
+        }
+
+        @Override
+        public PCollection<T> getPCollection() {
+          return pcollection;
+        }
+
+        @Override
+        public Instant getSynchronizedProcessingOutputWatermark() {
+          return synchronizedCompletionTime;
+        }
+
+        @Override
+        public String toString() {
+          return MoreObjects.toStringHelper(this)
+              .omitNullValues()
+              .add("pcollection", pcollection)
+              .add("key", key)
+              .add("elements", committedElements)
+              .toString();
+        }
+      };
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/334ab99a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java
index dcbbf40..078827d 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java
@@ -77,6 +77,7 @@ class InProcessEvaluationContext {
   /** The options that were used to create this {@link Pipeline}. */
   private final InProcessPipelineOptions options;
 
+  private final BundleFactory bundleFactory;
   /** The current processing time and event time watermarks and timers. */
   private final InMemoryWatermarkManager watermarkManager;
 
@@ -93,21 +94,24 @@ class InProcessEvaluationContext {
 
   public static InProcessEvaluationContext create(
       InProcessPipelineOptions options,
+      BundleFactory bundleFactory,
       Collection<AppliedPTransform<?, ?, ?>> rootTransforms,
       Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers,
       Map<AppliedPTransform<?, ?, ?>, String> stepNames,
       Collection<PCollectionView<?>> views) {
     return new InProcessEvaluationContext(
-        options, rootTransforms, valueToConsumers, stepNames, views);
+        options, bundleFactory, rootTransforms, valueToConsumers, stepNames, views);
   }
 
   private InProcessEvaluationContext(
       InProcessPipelineOptions options,
+      BundleFactory bundleFactory,
       Collection<AppliedPTransform<?, ?, ?>> rootTransforms,
       Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers,
       Map<AppliedPTransform<?, ?, ?>, String> stepNames,
       Collection<PCollectionView<?>> views) {
     this.options = checkNotNull(options);
+    this.bundleFactory = checkNotNull(bundleFactory);
     checkNotNull(rootTransforms);
     checkNotNull(valueToConsumers);
     checkNotNull(stepNames);
@@ -207,7 +211,7 @@ class InProcessEvaluationContext {
    * Create a {@link UncommittedBundle} for use by a source.
    */
   public <T> UncommittedBundle<T> createRootBundle(PCollection<T> output) {
-    return InProcessBundle.unkeyed(output);
+    return bundleFactory.createRootBundle(output);
   }
 
   /**
@@ -215,9 +219,7 @@ class InProcessEvaluationContext {
    * PCollection}.
    */
   public <T> UncommittedBundle<T> createBundle(CommittedBundle<?> input, PCollection<T> output) {
-    return input.isKeyed()
-        ? InProcessBundle.keyed(output, input.getKey())
-        : InProcessBundle.unkeyed(output);
+    return bundleFactory.createBundle(input, output);
   }
 
   /**
@@ -226,7 +228,7 @@ class InProcessEvaluationContext {
    */
   public <T> UncommittedBundle<T> createKeyedBundle(
       CommittedBundle<?> input, Object key, PCollection<T> output) {
-    return InProcessBundle.keyed(output, key);
+    return bundleFactory.createKeyedBundle(input, key, output);
   }
 
   /**
@@ -355,7 +357,9 @@ class InProcessEvaluationContext {
    * for each time they are set.
    */
   public Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> extractFiredTimers() {
-    return watermarkManager.extractFiredTimers();
+    Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> fired =
+        watermarkManager.extractFiredTimers();
+    return fired;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/334ab99a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java
index 8123711..4fb01b7 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java
@@ -232,6 +232,7 @@ public class InProcessPipelineRunner
     InProcessEvaluationContext context =
         InProcessEvaluationContext.create(
             getPipelineOptions(),
+            createBundleFactory(getPipelineOptions()),
             consumerTrackingVisitor.getRootTransforms(),
             consumerTrackingVisitor.getValueToConsumers(),
             consumerTrackingVisitor.getStepNames(),
@@ -271,6 +272,10 @@ public class InProcessPipelineRunner
     return Collections.emptyMap();
   }
 
+  private BundleFactory createBundleFactory(InProcessPipelineOptions pipelineOptions) {
+    return InProcessBundleFactory.create();
+  }
+
   /**
    * The result of running a {@link Pipeline} with the {@link InProcessPipelineRunner}.
    *

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/334ab99a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactoryTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactoryTest.java
index ebece5f..8e92caf 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactoryTest.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactoryTest.java
@@ -22,7 +22,6 @@ import static org.hamcrest.Matchers.emptyIterable;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import com.google.cloud.dataflow.sdk.coders.BigEndianLongCoder;
@@ -48,6 +47,7 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -63,20 +63,22 @@ public class BoundedReadEvaluatorFactoryTest {
   private PCollection<Long> longs;
   private TransformEvaluatorFactory factory;
   @Mock private InProcessEvaluationContext context;
+  private BundleFactory bundleFactory;
 
   @Before
   public void setup() {
+    MockitoAnnotations.initMocks(this);
     source = CountingSource.upTo(10L);
     TestPipeline p = TestPipeline.create();
     longs = p.apply(Read.from(source));
 
     factory = new BoundedReadEvaluatorFactory();
-    context = mock(InProcessEvaluationContext.class);
+    bundleFactory = InProcessBundleFactory.create();
   }
 
   @Test
   public void boundedSourceInMemoryTransformEvaluatorProducesElements() throws Exception {
-    UncommittedBundle<Long> output = InProcessBundle.unkeyed(longs);
+    UncommittedBundle<Long> output = bundleFactory.createRootBundle(longs);
     when(context.createRootBundle(longs)).thenReturn(output);
 
     TransformEvaluator<?> evaluator =
@@ -96,8 +98,7 @@ public class BoundedReadEvaluatorFactoryTest {
    */
   @Test
   public void boundedSourceInMemoryTransformEvaluatorAfterFinishIsEmpty() throws Exception {
-    UncommittedBundle<Long> output =
-        InProcessBundle.unkeyed(longs);
+    UncommittedBundle<Long> output = bundleFactory.createRootBundle(longs);
     when(context.createRootBundle(longs)).thenReturn(output);
 
     TransformEvaluator<?> evaluator =
@@ -111,7 +112,7 @@ public class BoundedReadEvaluatorFactoryTest {
         containsInAnyOrder(
             gw(1L), gw(2L), gw(4L), gw(8L), gw(9L), gw(7L), gw(6L), gw(5L), gw(3L), gw(0L)));
 
-    UncommittedBundle<Long> secondOutput = InProcessBundle.unkeyed(longs);
+    UncommittedBundle<Long> secondOutput = bundleFactory.createRootBundle(longs);
     when(context.createRootBundle(longs)).thenReturn(secondOutput);
     TransformEvaluator<?> secondEvaluator =
         factory.forApplication(longs.getProducingTransformInternal(), null, context);
@@ -132,8 +133,8 @@ public class BoundedReadEvaluatorFactoryTest {
    */
   @Test
   public void boundedSourceEvaluatorSimultaneousEvaluations() throws Exception {
-    UncommittedBundle<Long> output = InProcessBundle.unkeyed(longs);
-    UncommittedBundle<Long> secondOutput = InProcessBundle.unkeyed(longs);
+    UncommittedBundle<Long> output = bundleFactory.createRootBundle(longs);
+    UncommittedBundle<Long> secondOutput = bundleFactory.createRootBundle(longs);
     when(context.createRootBundle(longs)).thenReturn(output).thenReturn(secondOutput);
 
     // create both evaluators before finishing either.
@@ -171,7 +172,7 @@ public class BoundedReadEvaluatorFactoryTest {
     PCollection<Long> pcollection = p.apply(Read.from(source));
     AppliedPTransform<?, ?, ?> sourceTransform = pcollection.getProducingTransformInternal();
 
-    UncommittedBundle<Long> output = InProcessBundle.unkeyed(longs);
+    UncommittedBundle<Long> output = bundleFactory.createRootBundle(pcollection);
     when(context.createRootBundle(pcollection)).thenReturn(output);
 
     TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, null, context);
@@ -189,7 +190,7 @@ public class BoundedReadEvaluatorFactoryTest {
     PCollection<Long> pcollection = p.apply(Read.from(source));
     AppliedPTransform<?, ?, ?> sourceTransform = pcollection.getProducingTransformInternal();
 
-    UncommittedBundle<Long> output = InProcessBundle.unkeyed(longs);
+    UncommittedBundle<Long> output = bundleFactory.createRootBundle(pcollection);
     when(context.createRootBundle(pcollection)).thenReturn(output);
 
     TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, null, context);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/334ab99a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactoryTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactoryTest.java
index 39cc54a..f93abd8 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactoryTest.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactoryTest.java
@@ -45,6 +45,7 @@ import org.junit.runners.JUnit4;
  */
 @RunWith(JUnit4.class)
 public class FlattenEvaluatorFactoryTest {
+  private BundleFactory bundleFactory = InProcessBundleFactory.create();
   @Test
   public void testFlattenInMemoryEvaluator() throws Exception {
     TestPipeline p = TestPipeline.create();
@@ -54,13 +55,15 @@ public class FlattenEvaluatorFactoryTest {
 
     PCollection<Integer> flattened = list.apply(Flatten.<Integer>pCollections());
 
-    CommittedBundle<Integer> leftBundle = InProcessBundle.unkeyed(left).commit(Instant.now());
-    CommittedBundle<Integer> rightBundle = InProcessBundle.unkeyed(right).commit(Instant.now());
+    CommittedBundle<Integer> leftBundle =
+        bundleFactory.createRootBundle(left).commit(Instant.now());
+    CommittedBundle<Integer> rightBundle =
+        bundleFactory.createRootBundle(right).commit(Instant.now());
 
     InProcessEvaluationContext context = mock(InProcessEvaluationContext.class);
 
-    UncommittedBundle<Integer> flattenedLeftBundle = InProcessBundle.unkeyed(flattened);
-    UncommittedBundle<Integer> flattenedRightBundle = InProcessBundle.unkeyed(flattened);
+    UncommittedBundle<Integer> flattenedLeftBundle = bundleFactory.createRootBundle(flattened);
+    UncommittedBundle<Integer> flattenedRightBundle = bundleFactory.createRootBundle(flattened);
 
     when(context.createBundle(leftBundle, flattened)).thenReturn(flattenedLeftBundle);
     when(context.createBundle(rightBundle, flattened)).thenReturn(flattenedRightBundle);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/334ab99a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java
index 9c0957d..e80125f 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java
@@ -50,6 +50,8 @@ import org.junit.runners.JUnit4;
  */
 @RunWith(JUnit4.class)
 public class GroupByKeyEvaluatorFactoryTest {
+  private BundleFactory bundleFactory = InProcessBundleFactory.create();
+
   @Test
   public void testInMemoryEvaluator() throws Exception {
     TestPipeline p = TestPipeline.create();
@@ -67,15 +69,15 @@ public class GroupByKeyEvaluatorFactoryTest {
         kvs.apply(new GroupByKeyEvaluatorFactory.InProcessGroupByKeyOnly<String, Integer>());
 
     CommittedBundle<KV<String, WindowedValue<Integer>>> inputBundle =
-        InProcessBundle.unkeyed(kvs).commit(Instant.now());
+        bundleFactory.createRootBundle(kvs).commit(Instant.now());
     InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class);
 
     UncommittedBundle<KeyedWorkItem<String, Integer>> fooBundle =
-        InProcessBundle.keyed(groupedKvs, "foo");
+        bundleFactory.createKeyedBundle(null, "foo", groupedKvs);
     UncommittedBundle<KeyedWorkItem<String, Integer>> barBundle =
-        InProcessBundle.keyed(groupedKvs, "bar");
+        bundleFactory.createKeyedBundle(null, "bar", groupedKvs);
     UncommittedBundle<KeyedWorkItem<String, Integer>> bazBundle =
-        InProcessBundle.keyed(groupedKvs, "baz");
+        bundleFactory.createKeyedBundle(null, "baz", groupedKvs);
 
     when(evaluationContext.createKeyedBundle(inputBundle, "foo", groupedKvs)).thenReturn(fooBundle);
     when(evaluationContext.createKeyedBundle(inputBundle, "bar", groupedKvs)).thenReturn(barBundle);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/334ab99a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityCheckingBundleFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityCheckingBundleFactoryTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityCheckingBundleFactoryTest.java
new file mode 100644
index 0000000..40b1d5a
--- /dev/null
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityCheckingBundleFactoryTest.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.runners.inprocess;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.isA;
+import static org.junit.Assert.assertThat;
+
+import com.google.cloud.dataflow.sdk.coders.ByteArrayCoder;
+import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
+import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
+import com.google.cloud.dataflow.sdk.testing.TestPipeline;
+import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.windowing.IntervalWindow;
+import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
+import com.google.cloud.dataflow.sdk.util.IllegalMutationException;
+import com.google.cloud.dataflow.sdk.util.UserCodeException;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link ImmutabilityCheckingBundleFactory}.
+ */
+@RunWith(JUnit4.class)
+public class ImmutabilityCheckingBundleFactoryTest {
+  @Rule public ExpectedException thrown = ExpectedException.none();
+  private ImmutabilityCheckingBundleFactory factory;
+  private PCollection<byte[]> created;
+  private PCollection<byte[]> transformed;
+
+  @Before
+  public void setup() {
+    TestPipeline p = TestPipeline.create();
+    created = p.apply(Create.<byte[]>of().withCoder(ByteArrayCoder.of()));
+    transformed = created.apply(ParDo.of(new IdentityDoFn<byte[]>()));
+    factory = ImmutabilityCheckingBundleFactory.create(InProcessBundleFactory.create());
+  }
+
+  @Test
+  public void noMutationRootBundleSucceeds() {
+    UncommittedBundle<byte[]> root = factory.createRootBundle(created);
+    byte[] array = new byte[] {0, 1, 2};
+    root.add(WindowedValue.valueInGlobalWindow(array));
+    CommittedBundle<byte[]> committed = root.commit(Instant.now());
+
+    assertThat(
+        committed.getElements(), containsInAnyOrder(WindowedValue.valueInGlobalWindow(array)));
+  }
+
+  @Test
+  public void noMutationKeyedBundleSucceeds() {
+    CommittedBundle<byte[]> root = factory.createRootBundle(created).commit(Instant.now());
+    UncommittedBundle<byte[]> keyed = factory.createKeyedBundle(root, "mykey", transformed);
+
+    WindowedValue<byte[]> windowedArray =
+        WindowedValue.of(
+            new byte[] {4, 8, 12},
+            new Instant(891L),
+            new IntervalWindow(new Instant(0), new Instant(1000)),
+            PaneInfo.ON_TIME_AND_ONLY_FIRING);
+    keyed.add(windowedArray);
+
+    CommittedBundle<byte[]> committed = keyed.commit(Instant.now());
+    assertThat(committed.getElements(), containsInAnyOrder(windowedArray));
+  }
+
+  @Test
+  public void noMutationCreateBundleSucceeds() {
+    CommittedBundle<byte[]> root = factory.createRootBundle(created).commit(Instant.now());
+    UncommittedBundle<byte[]> intermediate = factory.createBundle(root, transformed);
+
+    WindowedValue<byte[]> windowedArray =
+        WindowedValue.of(
+            new byte[] {4, 8, 12},
+            new Instant(891L),
+            new IntervalWindow(new Instant(0), new Instant(1000)),
+            PaneInfo.ON_TIME_AND_ONLY_FIRING);
+    intermediate.add(windowedArray);
+
+    CommittedBundle<byte[]> committed = intermediate.commit(Instant.now());
+    assertThat(committed.getElements(), containsInAnyOrder(windowedArray));
+  }
+
+  @Test
+  public void mutationBeforeAddRootBundleSucceeds() {
+    UncommittedBundle<byte[]> root = factory.createRootBundle(created);
+    byte[] array = new byte[] {0, 1, 2};
+    array[1] = 2;
+    root.add(WindowedValue.valueInGlobalWindow(array));
+    CommittedBundle<byte[]> committed = root.commit(Instant.now());
+
+    assertThat(
+        committed.getElements(), containsInAnyOrder(WindowedValue.valueInGlobalWindow(array)));
+  }
+
+  @Test
+  public void mutationBeforeAddKeyedBundleSucceeds() {
+    CommittedBundle<byte[]> root = factory.createRootBundle(created).commit(Instant.now());
+    UncommittedBundle<byte[]> keyed = factory.createKeyedBundle(root, "mykey", transformed);
+
+    byte[] array = new byte[] {4, 8, 12};
+    array[0] = Byte.MAX_VALUE;
+    WindowedValue<byte[]> windowedArray =
+        WindowedValue.of(
+            array,
+            new Instant(891L),
+            new IntervalWindow(new Instant(0), new Instant(1000)),
+            PaneInfo.ON_TIME_AND_ONLY_FIRING);
+    keyed.add(windowedArray);
+
+    CommittedBundle<byte[]> committed = keyed.commit(Instant.now());
+    assertThat(committed.getElements(), containsInAnyOrder(windowedArray));
+  }
+
+  @Test
+  public void mutationBeforeAddCreateBundleSucceeds() {
+    CommittedBundle<byte[]> root = factory.createRootBundle(created).commit(Instant.now());
+    UncommittedBundle<byte[]> intermediate = factory.createBundle(root, transformed);
+
+    byte[] array = new byte[] {4, 8, 12};
+    WindowedValue<byte[]> windowedArray =
+        WindowedValue.of(
+            array,
+            new Instant(891L),
+            new IntervalWindow(new Instant(0), new Instant(1000)),
+            PaneInfo.ON_TIME_AND_ONLY_FIRING);
+    array[2] = -3;
+    intermediate.add(windowedArray);
+
+    CommittedBundle<byte[]> committed = intermediate.commit(Instant.now());
+    assertThat(committed.getElements(), containsInAnyOrder(windowedArray));
+  }
+
+  @Test
+  public void mutationAfterAddRootBundleThrows() {
+    UncommittedBundle<byte[]> root = factory.createRootBundle(created);
+    byte[] array = new byte[] {0, 1, 2};
+    root.add(WindowedValue.valueInGlobalWindow(array));
+
+    array[1] = 2;
+    thrown.expect(UserCodeException.class);
+    thrown.expectCause(isA(IllegalMutationException.class));
+    thrown.expectMessage("Values must not be mutated in any way after being output");
+    CommittedBundle<byte[]> committed = root.commit(Instant.now());
+  }
+
+  @Test
+  public void mutationAfterAddKeyedBundleThrows() {
+    CommittedBundle<byte[]> root = factory.createRootBundle(created).commit(Instant.now());
+    UncommittedBundle<byte[]> keyed = factory.createKeyedBundle(root, "mykey", transformed);
+
+    byte[] array = new byte[] {4, 8, 12};
+    WindowedValue<byte[]> windowedArray =
+        WindowedValue.of(
+            array,
+            new Instant(891L),
+            new IntervalWindow(new Instant(0), new Instant(1000)),
+            PaneInfo.ON_TIME_AND_ONLY_FIRING);
+    keyed.add(windowedArray);
+
+    array[0] = Byte.MAX_VALUE;
+    thrown.expect(UserCodeException.class);
+    thrown.expectCause(isA(IllegalMutationException.class));
+    thrown.expectMessage("Values must not be mutated in any way after being output");
+    CommittedBundle<byte[]> committed = keyed.commit(Instant.now());
+  }
+
+  @Test
+  public void mutationAfterAddCreateBundleThrows() {
+    CommittedBundle<byte[]> root = factory.createRootBundle(created).commit(Instant.now());
+    UncommittedBundle<byte[]> intermediate = factory.createBundle(root, transformed);
+
+    byte[] array = new byte[] {4, 8, 12};
+    WindowedValue<byte[]> windowedArray =
+        WindowedValue.of(
+            array,
+            new Instant(891L),
+            new IntervalWindow(new Instant(0), new Instant(1000)),
+            PaneInfo.ON_TIME_AND_ONLY_FIRING);
+    intermediate.add(windowedArray);
+
+    array[2] = -3;
+    thrown.expect(UserCodeException.class);
+    thrown.expectCause(isA(IllegalMutationException.class));
+    thrown.expectMessage("Values must not be mutated in any way after being output");
+    CommittedBundle<byte[]> committed = intermediate.commit(Instant.now());
+  }
+
+  private static class IdentityDoFn<T> extends DoFn<T, T> {
+    @Override
+    public void processElement(DoFn<T, T>.ProcessContext c) throws Exception {
+      c.output(c.element());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/334ab99a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManagerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManagerTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManagerTest.java
index 140ac05..93d2a42 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManagerTest.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManagerTest.java
@@ -84,6 +84,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
   private transient PCollection<Integer> flattened;
 
   private transient InMemoryWatermarkManager manager;
+  private transient BundleFactory bundleFactory;
 
   @Before
   public void setup() {
@@ -131,6 +132,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
     clock = MockClock.fromInstant(new Instant(1000));
 
     manager = InMemoryWatermarkManager.create(clock, rootTransforms, consumers);
+    bundleFactory = InProcessBundleFactory.create();
   }
 
   /**
@@ -246,7 +248,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
     assertThat(withBufferedElements.getOutputWatermark(), equalTo(firstCollectionTimestamp));
 
     CommittedBundle<?> completedFlattenBundle =
-        InProcessBundle.unkeyed(flattened).commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
+        bundleFactory.createRootBundle(flattened).commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
     manager.updateWatermarks(firstPcollectionBundle, flattened.getProducingTransformInternal(),
         TimerUpdate.empty(), Collections.<CommittedBundle<?>>singleton(completedFlattenBundle),
         null);
@@ -342,13 +344,13 @@ public class InMemoryWatermarkManagerTest implements Serializable {
   @Test
   public void updateWatermarkWithKeyedWatermarkHolds() {
     CommittedBundle<Integer> firstKeyBundle =
-        InProcessBundle.keyed(createdInts, "Odd")
+        bundleFactory.createKeyedBundle(null, "Odd", createdInts)
             .add(WindowedValue.timestampedValueInGlobalWindow(1, new Instant(1_000_000L)))
             .add(WindowedValue.timestampedValueInGlobalWindow(3, new Instant(-1000L)))
             .commit(clock.now());
 
     CommittedBundle<Integer> secondKeyBundle =
-        InProcessBundle.keyed(createdInts, "Even")
+        bundleFactory.createKeyedBundle(null, "Even", createdInts)
             .add(WindowedValue.timestampedValueInGlobalWindow(2, new Instant(1234L)))
             .commit(clock.now());
 
@@ -368,7 +370,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
     assertThat(filteredWatermarks.getOutputWatermark(), not(laterThan(new Instant(-1000L))));
 
     CommittedBundle<Integer> fauxFirstKeyTimerBundle =
-        InProcessBundle.keyed(createdInts, "Odd").commit(clock.now());
+        bundleFactory.createKeyedBundle(null, "Odd", createdInts).commit(clock.now());
     manager.updateWatermarks(fauxFirstKeyTimerBundle, filtered.getProducingTransformInternal(),
         TimerUpdate.empty(), Collections.<CommittedBundle<?>>emptyList(),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
@@ -376,7 +378,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
     assertThat(filteredWatermarks.getOutputWatermark(), equalTo(new Instant(1234L)));
 
     CommittedBundle<Integer> fauxSecondKeyTimerBundle =
-        InProcessBundle.keyed(createdInts, "Even").commit(clock.now());
+        bundleFactory.createKeyedBundle(null, "Even", createdInts).commit(clock.now());
     manager.updateWatermarks(fauxSecondKeyTimerBundle, filtered.getProducingTransformInternal(),
         TimerUpdate.empty(), Collections.<CommittedBundle<?>>emptyList(), new Instant(5678L));
     assertThat(filteredWatermarks.getOutputWatermark(), equalTo(new Instant(5678L)));
@@ -396,7 +398,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
   @Test
   public void updateOutputWatermarkShouldBeMonotonic() {
     CommittedBundle<?> firstInput =
-        InProcessBundle.unkeyed(createdInts).commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
+        bundleFactory.createRootBundle(createdInts).commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
     manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
         Collections.<CommittedBundle<?>>singleton(firstInput), new Instant(0L));
     TransformWatermarks firstWatermarks =
@@ -404,7 +406,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
     assertThat(firstWatermarks.getOutputWatermark(), equalTo(new Instant(0L)));
 
     CommittedBundle<?> secondInput =
-        InProcessBundle.unkeyed(createdInts).commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
+        bundleFactory.createRootBundle(createdInts).commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
     manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
         Collections.<CommittedBundle<?>>singleton(secondInput), new Instant(-250L));
     TransformWatermarks secondWatermarks =
@@ -579,7 +581,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
         not(laterThan(BoundedWindow.TIMESTAMP_MIN_VALUE)));
 
     CommittedBundle<Integer> createOutput =
-        InProcessBundle.unkeyed(createdInts).commit(new Instant(1250L));
+        bundleFactory.createRootBundle(createdInts).commit(new Instant(1250L));
 
     manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
         Collections.<CommittedBundle<?>>singleton(createOutput), BoundedWindow.TIMESTAMP_MAX_VALUE);
@@ -606,7 +608,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
         not(laterThan(new Instant(1250L))));
 
     CommittedBundle<?> filterOutputBundle =
-        InProcessBundle.unkeyed(intsToFlatten).commit(new Instant(1250L));
+        bundleFactory.createRootBundle(intsToFlatten).commit(new Instant(1250L));
     manager.updateWatermarks(createOutput, filtered.getProducingTransformInternal(),
         TimerUpdate.empty(), Collections.<CommittedBundle<?>>singleton(filterOutputBundle),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
@@ -673,9 +675,11 @@ public class InMemoryWatermarkManagerTest implements Serializable {
     assertThat(filteredDoubledWms.getSynchronizedProcessingOutputTime(), not(laterThan(startTime)));
 
     CommittedBundle<Integer> filteredTimerBundle =
-        InProcessBundle.keyed(filtered, "key").commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
+        bundleFactory
+            .createKeyedBundle(null, "key", filtered)
+            .commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
     CommittedBundle<Integer> filteredTimerResult =
-        InProcessBundle.keyed(filteredTimesTwo, "key")
+        bundleFactory.createKeyedBundle(null, "key", filteredTimesTwo)
             .commit(filteredWms.getSynchronizedProcessingOutputTime());
     // Complete the processing time timer
     manager.updateWatermarks(filteredTimerBundle, filtered.getProducingTransformInternal(),
@@ -725,7 +729,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
         not(laterThan(BoundedWindow.TIMESTAMP_MIN_VALUE)));
 
     CommittedBundle<Integer> createOutput =
-        InProcessBundle.unkeyed(createdInts).commit(new Instant(1250L));
+        bundleFactory.createRootBundle(createdInts).commit(new Instant(1250L));
 
     manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
         Collections.<CommittedBundle<?>>singleton(createOutput), BoundedWindow.TIMESTAMP_MAX_VALUE);
@@ -736,7 +740,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
         createAfterUpdate.getSynchronizedProcessingOutputTime(), not(laterThan(clock.now())));
 
     CommittedBundle<Integer> createSecondOutput =
-        InProcessBundle.unkeyed(createdInts).commit(new Instant(750L));
+        bundleFactory.createRootBundle(createdInts).commit(new Instant(750L));
     manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
         Collections.<CommittedBundle<?>>singleton(createSecondOutput),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
@@ -784,13 +788,20 @@ public class InMemoryWatermarkManagerTest implements Serializable {
   @Test
   public void synchronizedProcessingInputTimeIsHeldToPendingBundleTimes() {
     CommittedBundle<Integer> created = globallyWindowedBundle(createdInts, 1, 2, 3);
-    manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
-        Collections.<CommittedBundle<?>>singleton(created), new Instant(29_919_235L));
+    manager.updateWatermarks(
+        null,
+        createdInts.getProducingTransformInternal(),
+        TimerUpdate.empty(),
+        Collections.<CommittedBundle<?>>singleton(created),
+        new Instant(29_919_235L));
 
     Instant upstreamHold = new Instant(2048L);
     CommittedBundle<Integer> filteredBundle =
-        InProcessBundle.keyed(filtered, "key").commit(upstreamHold);
-    manager.updateWatermarks(created, filtered.getProducingTransformInternal(), TimerUpdate.empty(),
+        bundleFactory.createKeyedBundle(null, "key", filtered).commit(upstreamHold);
+    manager.updateWatermarks(
+        created,
+        filtered.getProducingTransformInternal(),
+        TimerUpdate.empty(),
         Collections.<CommittedBundle<?>>singleton(filteredBundle),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
 
@@ -1094,7 +1105,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
   @SafeVarargs
   private final <T> CommittedBundle<T> timestampedBundle(
       PCollection<T> pc, TimestampedValue<T>... values) {
-    UncommittedBundle<T> bundle = InProcessBundle.unkeyed(pc);
+    UncommittedBundle<T> bundle = bundleFactory.createRootBundle(pc);
     for (TimestampedValue<T> value : values) {
       bundle.add(
           WindowedValue.timestampedValueInGlobalWindow(value.getValue(), value.getTimestamp()));
@@ -1104,7 +1115,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
 
   @SafeVarargs
   private final <T> CommittedBundle<T> globallyWindowedBundle(PCollection<T> pc, T... values) {
-    UncommittedBundle<T> bundle = InProcessBundle.unkeyed(pc);
+    UncommittedBundle<T> bundle = bundleFactory.createRootBundle(pc);
     for (T value : values) {
       bundle.add(WindowedValue.valueInGlobalWindow(value));
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/334ab99a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundleFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundleFactoryTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundleFactoryTest.java
new file mode 100644
index 0000000..060d43c
--- /dev/null
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundleFactoryTest.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.runners.inprocess;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertThat;
+
+import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
+import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
+import com.google.cloud.dataflow.sdk.testing.TestPipeline;
+import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.transforms.WithKeys;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.common.collect.ImmutableList;
+
+import org.hamcrest.Matcher;
+import org.hamcrest.Matchers;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * Tests for {@link InProcessBundleFactory}.
+ */
+@RunWith(JUnit4.class)
+public class InProcessBundleFactoryTest {
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  private InProcessBundleFactory bundleFactory = InProcessBundleFactory.create();
+
+  private PCollection<Integer> created;
+  private PCollection<KV<String, Integer>> downstream;
+
+  @Before
+  public void setup() {
+    TestPipeline p = TestPipeline.create();
+    created = p.apply(Create.of(1, 2, 3));
+    downstream = created.apply(WithKeys.<String, Integer>of("foo"));
+  }
+
+  @Test
+  public void createRootBundleShouldCreateWithNullKey() {
+    PCollection<Integer> pcollection = TestPipeline.create().apply(Create.of(1));
+
+    UncommittedBundle<Integer> inFlightBundle = bundleFactory.createRootBundle(pcollection);
+
+    CommittedBundle<Integer> bundle = inFlightBundle.commit(Instant.now());
+
+    assertThat(bundle.isKeyed(), is(false));
+    assertThat(bundle.getKey(), nullValue());
+  }
+
+  private void createKeyedBundle(Object key) {
+    PCollection<Integer> pcollection = TestPipeline.create().apply(Create.of(1));
+
+    UncommittedBundle<Integer> inFlightBundle =
+        bundleFactory.createKeyedBundle(null, key, pcollection);
+
+    CommittedBundle<Integer> bundle = inFlightBundle.commit(Instant.now());
+    assertThat(bundle.isKeyed(), is(true));
+    assertThat(bundle.getKey(), equalTo(key));
+  }
+
+  @Test
+  public void keyedWithNullKeyShouldCreateKeyedBundle() {
+    createKeyedBundle(null);
+  }
+
+  @Test
+  public void keyedWithKeyShouldCreateKeyedBundle() {
+    createKeyedBundle(new Object());
+  }
+
+  private <T> void afterCommitGetElementsShouldHaveAddedElements(Iterable<WindowedValue<T>> elems) {
+    PCollection<T> pcollection = TestPipeline.create().apply(Create.<T>of());
+
+    UncommittedBundle<T> bundle = bundleFactory.createRootBundle(pcollection);
+    Collection<Matcher<? super WindowedValue<T>>> expectations = new ArrayList<>();
+    for (WindowedValue<T> elem : elems) {
+      bundle.add(elem);
+      expectations.add(equalTo(elem));
+    }
+    Matcher<Iterable<? extends WindowedValue<T>>> containsMatcher =
+        Matchers.<WindowedValue<T>>containsInAnyOrder(expectations);
+    assertThat(bundle.commit(Instant.now()).getElements(), containsMatcher);
+  }
+
+  @Test
+  public void getElementsBeforeAddShouldReturnEmptyIterable() {
+    afterCommitGetElementsShouldHaveAddedElements(Collections.<WindowedValue<Integer>>emptyList());
+  }
+
+  @Test
+  public void getElementsAfterAddShouldReturnAddedElements() {
+    WindowedValue<Integer> firstValue = WindowedValue.valueInGlobalWindow(1);
+    WindowedValue<Integer> secondValue =
+        WindowedValue.timestampedValueInGlobalWindow(2, new Instant(1000L));
+
+    afterCommitGetElementsShouldHaveAddedElements(ImmutableList.of(firstValue, secondValue));
+  }
+
+  @Test
+  public void addAfterCommitShouldThrowException() {
+    PCollection<Integer> pcollection = TestPipeline.create().apply(Create.<Integer>of());
+
+    UncommittedBundle<Integer> bundle = bundleFactory.createRootBundle(pcollection);
+    bundle.add(WindowedValue.valueInGlobalWindow(1));
+    CommittedBundle<Integer> firstCommit = bundle.commit(Instant.now());
+    assertThat(firstCommit.getElements(), containsInAnyOrder(WindowedValue.valueInGlobalWindow(1)));
+
+    thrown.expect(IllegalStateException.class);
+    thrown.expectMessage("3");
+    thrown.expectMessage("committed");
+
+    bundle.add(WindowedValue.valueInGlobalWindow(3));
+  }
+
+  @Test
+  public void commitAfterCommitShouldThrowException() {
+    PCollection<Integer> pcollection = TestPipeline.create().apply(Create.<Integer>of());
+
+    UncommittedBundle<Integer> bundle = bundleFactory.createRootBundle(pcollection);
+    bundle.add(WindowedValue.valueInGlobalWindow(1));
+    CommittedBundle<Integer> firstCommit = bundle.commit(Instant.now());
+    assertThat(firstCommit.getElements(), containsInAnyOrder(WindowedValue.valueInGlobalWindow(1)));
+
+    thrown.expect(IllegalStateException.class);
+    thrown.expectMessage("committed");
+
+    bundle.commit(Instant.now());
+  }
+
+  @Test
+  public void createBundleUnkeyedResultUnkeyed() {
+    CommittedBundle<KV<String, Integer>> newBundle =
+        bundleFactory
+            .createBundle(bundleFactory.createRootBundle(created).commit(Instant.now()), downstream)
+            .commit(Instant.now());
+    assertThat(newBundle.isKeyed(), is(false));
+  }
+
+  @Test
+  public void createBundleKeyedResultPropagatesKey() {
+    CommittedBundle<KV<String, Integer>> newBundle =
+        bundleFactory
+            .createBundle(
+                bundleFactory.createKeyedBundle(null, "foo", created).commit(Instant.now()),
+                downstream)
+            .commit(Instant.now());
+    assertThat(newBundle.isKeyed(), is(true));
+    assertThat(newBundle.getKey(), Matchers.<Object>equalTo("foo"));
+  }
+
+  @Test
+  public void createRootBundleUnkeyed() {
+    assertThat(bundleFactory.createRootBundle(created).commit(Instant.now()).isKeyed(), is(false));
+  }
+
+  @Test
+  public void createKeyedBundleKeyed() {
+    CommittedBundle<KV<String, Integer>> keyedBundle =
+        bundleFactory
+            .createKeyedBundle(
+                bundleFactory.createRootBundle(created).commit(Instant.now()), "foo", downstream)
+            .commit(Instant.now());
+    assertThat(keyedBundle.isKeyed(), is(true));
+    assertThat(keyedBundle.getKey(), Matchers.<Object>equalTo("foo"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/334ab99a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java
index 564f3f2..fde2cb4 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java
@@ -124,6 +124,7 @@ public class InProcessEvaluationContextTest {
     Collection<PCollectionView<?>> views = ImmutableList.<PCollectionView<?>>of(view);
     context = InProcessEvaluationContext.create(
             runner.getPipelineOptions(),
+            InProcessBundleFactory.create(),
             rootTransforms,
             valueToConsumers,
             stepNames,
@@ -170,7 +171,9 @@ public class InProcessEvaluationContextTest {
     stepContext.stateInternals().state(StateNamespaces.global(), intBag).add(1);
 
     context.handleResult(
-        InProcessBundle.keyed(created, "foo").commit(Instant.now()),
+        InProcessBundleFactory.create()
+            .createKeyedBundle(null, "foo", created)
+            .commit(Instant.now()),
         ImmutableList.<TimerData>of(),
         StepTransformResult.withoutHold(created.getProducingTransformInternal())
             .withState(stepContext.commitState())
@@ -262,7 +265,7 @@ public class InProcessEvaluationContextTest {
             .withCounters(againCounters)
             .build();
     context.handleResult(
-        InProcessBundle.unkeyed(created).commit(Instant.now()),
+        context.createRootBundle(created).commit(Instant.now()),
         ImmutableList.<TimerData>of(),
         secondResult);
     assertThat((Long) context.getCounters().getExistingCounter("foo").getAggregate(), equalTo(12L));
@@ -289,7 +292,7 @@ public class InProcessEvaluationContextTest {
             .build();
 
     context.handleResult(
-        InProcessBundle.keyed(created, myKey).commit(Instant.now()),
+        context.createKeyedBundle(null, myKey, created).commit(Instant.now()),
         ImmutableList.<TimerData>of(),
         stateResult);
 
@@ -371,7 +374,7 @@ public class InProcessEvaluationContextTest {
     // haven't added any timers, must be empty
     assertThat(context.extractFiredTimers().entrySet(), emptyIterable());
     context.handleResult(
-        InProcessBundle.keyed(created, key).commit(Instant.now()),
+        context.createKeyedBundle(null, key, created).commit(Instant.now()),
         ImmutableList.<TimerData>of(),
         timerResult);
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/334ab99a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactoryTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactoryTest.java
index 664161c..87247ac 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactoryTest.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactoryTest.java
@@ -69,6 +69,8 @@ import java.io.Serializable;
  */
 @RunWith(JUnit4.class)
 public class ParDoMultiEvaluatorFactoryTest implements Serializable {
+  private transient BundleFactory bundleFactory = InProcessBundleFactory.create();
+
   @Test
   public void testParDoMultiInMemoryTransformEvaluator() throws Exception {
     TestPipeline p = TestPipeline.create();
@@ -80,26 +82,30 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable {
     final TupleTag<Integer> lengthTag = new TupleTag<>();
 
     BoundMulti<String, KV<String, Integer>> pardo =
-        ParDo.of(new DoFn<String, KV<String, Integer>>() {
-          @Override
-          public void processElement(ProcessContext c) {
-            c.output(KV.<String, Integer>of(c.element(), c.element().length()));
-            c.sideOutput(elementTag, c.element());
-            c.sideOutput(lengthTag, c.element().length());
-          }
-        }).withOutputTags(mainOutputTag, TupleTagList.of(elementTag).and(lengthTag));
+        ParDo.of(
+                new DoFn<String, KV<String, Integer>>() {
+                  @Override
+                  public void processElement(ProcessContext c) {
+                    c.output(KV.<String, Integer>of(c.element(), c.element().length()));
+                    c.sideOutput(elementTag, c.element());
+                    c.sideOutput(lengthTag, c.element().length());
+                  }
+                })
+            .withOutputTags(mainOutputTag, TupleTagList.of(elementTag).and(lengthTag));
     PCollectionTuple outputTuple = input.apply(pardo);
 
-    CommittedBundle<String> inputBundle = InProcessBundle.unkeyed(input).commit(Instant.now());
+    CommittedBundle<String> inputBundle =
+        bundleFactory.createRootBundle(input).commit(Instant.now());
 
     PCollection<KV<String, Integer>> mainOutput = outputTuple.get(mainOutputTag);
     PCollection<String> elementOutput = outputTuple.get(elementTag);
     PCollection<Integer> lengthOutput = outputTuple.get(lengthTag);
 
     InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class);
-    UncommittedBundle<KV<String, Integer>> mainOutputBundle = InProcessBundle.unkeyed(mainOutput);
-    UncommittedBundle<String> elementOutputBundle = InProcessBundle.unkeyed(elementOutput);
-    UncommittedBundle<Integer> lengthOutputBundle = InProcessBundle.unkeyed(lengthOutput);
+    UncommittedBundle<KV<String, Integer>> mainOutputBundle =
+        bundleFactory.createRootBundle(mainOutput);
+    UncommittedBundle<String> elementOutputBundle = bundleFactory.createRootBundle(elementOutput);
+    UncommittedBundle<Integer> lengthOutputBundle = bundleFactory.createRootBundle(lengthOutput);
 
     when(evaluationContext.createBundle(inputBundle, mainOutput)).thenReturn(mainOutputBundle);
     when(evaluationContext.createBundle(inputBundle, elementOutput))
@@ -114,8 +120,9 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable {
     when(evaluationContext.createCounterSet()).thenReturn(counters);
 
     com.google.cloud.dataflow.sdk.runners.inprocess.TransformEvaluator<String> evaluator =
-        new ParDoMultiEvaluatorFactory().forApplication(
-            mainOutput.getProducingTransformInternal(), inputBundle, evaluationContext);
+        new ParDoMultiEvaluatorFactory()
+            .forApplication(
+                mainOutput.getProducingTransformInternal(), inputBundle, evaluationContext);
 
     evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
     evaluator.processElement(
@@ -163,24 +170,28 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable {
     final TupleTag<Integer> lengthTag = new TupleTag<>();
 
     BoundMulti<String, KV<String, Integer>> pardo =
-        ParDo.of(new DoFn<String, KV<String, Integer>>() {
-          @Override
-          public void processElement(ProcessContext c) {
-            c.output(KV.<String, Integer>of(c.element(), c.element().length()));
-            c.sideOutput(elementTag, c.element());
-            c.sideOutput(lengthTag, c.element().length());
-          }
-        }).withOutputTags(mainOutputTag, TupleTagList.of(elementTag));
+        ParDo.of(
+                new DoFn<String, KV<String, Integer>>() {
+                  @Override
+                  public void processElement(ProcessContext c) {
+                    c.output(KV.<String, Integer>of(c.element(), c.element().length()));
+                    c.sideOutput(elementTag, c.element());
+                    c.sideOutput(lengthTag, c.element().length());
+                  }
+                })
+            .withOutputTags(mainOutputTag, TupleTagList.of(elementTag));
     PCollectionTuple outputTuple = input.apply(pardo);
 
-    CommittedBundle<String> inputBundle = InProcessBundle.unkeyed(input).commit(Instant.now());
+    CommittedBundle<String> inputBundle =
+        bundleFactory.createRootBundle(input).commit(Instant.now());
 
     PCollection<KV<String, Integer>> mainOutput = outputTuple.get(mainOutputTag);
     PCollection<String> elementOutput = outputTuple.get(elementTag);
 
     InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class);
-    UncommittedBundle<KV<String, Integer>> mainOutputBundle = InProcessBundle.unkeyed(mainOutput);
-    UncommittedBundle<String> elementOutputBundle = InProcessBundle.unkeyed(elementOutput);
+    UncommittedBundle<KV<String, Integer>> mainOutputBundle =
+        bundleFactory.createRootBundle(mainOutput);
+    UncommittedBundle<String> elementOutputBundle = bundleFactory.createRootBundle(elementOutput);
 
     when(evaluationContext.createBundle(inputBundle, mainOutput)).thenReturn(mainOutputBundle);
     when(evaluationContext.createBundle(inputBundle, elementOutput))
@@ -194,8 +205,9 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable {
     when(evaluationContext.createCounterSet()).thenReturn(counters);
 
     com.google.cloud.dataflow.sdk.runners.inprocess.TransformEvaluator<String> evaluator =
-        new ParDoMultiEvaluatorFactory().forApplication(
-            mainOutput.getProducingTransformInternal(), inputBundle, evaluationContext);
+        new ParDoMultiEvaluatorFactory()
+            .forApplication(
+                mainOutput.getProducingTransformInternal(), inputBundle, evaluationContext);
 
     evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
     evaluator.processElement(
@@ -206,8 +218,7 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable {
     InProcessTransformResult result = evaluator.finishBundle();
     assertThat(
         result.getOutputBundles(),
-        Matchers.<UncommittedBundle<?>>containsInAnyOrder(
-            mainOutputBundle, elementOutputBundle));
+        Matchers.<UncommittedBundle<?>>containsInAnyOrder(mainOutputBundle, elementOutputBundle));
     assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
     assertThat(result.getCounters(), equalTo(counters));
 
@@ -261,14 +272,16 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable {
             .withOutputTags(mainOutputTag, TupleTagList.of(elementTag));
     PCollectionTuple outputTuple = input.apply(pardo);
 
-    CommittedBundle<String> inputBundle = InProcessBundle.unkeyed(input).commit(Instant.now());
+    CommittedBundle<String> inputBundle =
+        bundleFactory.createRootBundle(input).commit(Instant.now());
 
     PCollection<KV<String, Integer>> mainOutput = outputTuple.get(mainOutputTag);
     PCollection<String> elementOutput = outputTuple.get(elementTag);
 
     InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class);
-    UncommittedBundle<KV<String, Integer>> mainOutputBundle = InProcessBundle.unkeyed(mainOutput);
-    UncommittedBundle<String> elementOutputBundle = InProcessBundle.unkeyed(elementOutput);
+    UncommittedBundle<KV<String, Integer>> mainOutputBundle =
+        bundleFactory.createRootBundle(mainOutput);
+    UncommittedBundle<String> elementOutputBundle = bundleFactory.createRootBundle(elementOutput);
 
     when(evaluationContext.createBundle(inputBundle, mainOutput)).thenReturn(mainOutputBundle);
     when(evaluationContext.createBundle(inputBundle, elementOutput))
@@ -282,8 +295,9 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable {
     when(evaluationContext.createCounterSet()).thenReturn(counters);
 
     com.google.cloud.dataflow.sdk.runners.inprocess.TransformEvaluator<String> evaluator =
-        new ParDoMultiEvaluatorFactory().forApplication(
-            mainOutput.getProducingTransformInternal(), inputBundle, evaluationContext);
+        new ParDoMultiEvaluatorFactory()
+            .forApplication(
+                mainOutput.getProducingTransformInternal(), inputBundle, evaluationContext);
 
     evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
     evaluator.processElement(
@@ -368,14 +382,16 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable {
             .withOutputTags(mainOutputTag, TupleTagList.of(elementTag));
     PCollectionTuple outputTuple = input.apply(pardo);
 
-    CommittedBundle<String> inputBundle = InProcessBundle.unkeyed(input).commit(Instant.now());
+    CommittedBundle<String> inputBundle =
+        bundleFactory.createRootBundle(input).commit(Instant.now());
 
     PCollection<KV<String, Integer>> mainOutput = outputTuple.get(mainOutputTag);
     PCollection<String> elementOutput = outputTuple.get(elementTag);
 
     InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class);
-    UncommittedBundle<KV<String, Integer>> mainOutputBundle = InProcessBundle.unkeyed(mainOutput);
-    UncommittedBundle<String> elementOutputBundle = InProcessBundle.unkeyed(elementOutput);
+    UncommittedBundle<KV<String, Integer>> mainOutputBundle =
+        bundleFactory.createRootBundle(mainOutput);
+    UncommittedBundle<String> elementOutputBundle = bundleFactory.createRootBundle(elementOutput);
 
     when(evaluationContext.createBundle(inputBundle, mainOutput)).thenReturn(mainOutputBundle);
     when(evaluationContext.createBundle(inputBundle, elementOutput))
@@ -389,8 +405,9 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable {
     when(evaluationContext.createCounterSet()).thenReturn(counters);
 
     com.google.cloud.dataflow.sdk.runners.inprocess.TransformEvaluator<String> evaluator =
-        new ParDoMultiEvaluatorFactory().forApplication(
-            mainOutput.getProducingTransformInternal(), inputBundle, evaluationContext);
+        new ParDoMultiEvaluatorFactory()
+            .forApplication(
+                mainOutput.getProducingTransformInternal(), inputBundle, evaluationContext);
 
     evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
     evaluator.processElement(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/334ab99a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactoryTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactoryTest.java
index 9943fd7..704eb2a 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactoryTest.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactoryTest.java
@@ -66,21 +66,27 @@ import java.io.Serializable;
  */
 @RunWith(JUnit4.class)
 public class ParDoSingleEvaluatorFactoryTest implements Serializable {
+  private transient BundleFactory bundleFactory = InProcessBundleFactory.create();
+
   @Test
   public void testParDoInMemoryTransformEvaluator() throws Exception {
     TestPipeline p = TestPipeline.create();
 
     PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam"));
-    PCollection<Integer> collection = input.apply(ParDo.of(new DoFn<String, Integer>() {
-      @Override public void processElement(ProcessContext c) {
-        c.output(c.element().length());
-      }
-    }));
-    CommittedBundle<String> inputBundle = InProcessBundle.unkeyed(input).commit(Instant.now());
+    PCollection<Integer> collection =
+        input.apply(
+            ParDo.of(
+                new DoFn<String, Integer>() {
+                  @Override
+                  public void processElement(ProcessContext c) {
+                    c.output(c.element().length());
+                  }
+                }));
+    CommittedBundle<String> inputBundle =
+        bundleFactory.createRootBundle(input).commit(Instant.now());
 
     InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class);
-    UncommittedBundle<Integer> outputBundle =
-        InProcessBundle.unkeyed(collection);
+    UncommittedBundle<Integer> outputBundle = bundleFactory.createRootBundle(collection);
     when(evaluationContext.createBundle(inputBundle, collection)).thenReturn(outputBundle);
     InProcessExecutionContext executionContext =
         new InProcessExecutionContext(null, null, null, null);
@@ -90,8 +96,9 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable {
     when(evaluationContext.createCounterSet()).thenReturn(counters);
 
     com.google.cloud.dataflow.sdk.runners.inprocess.TransformEvaluator<String> evaluator =
-        new ParDoSingleEvaluatorFactory().forApplication(
-            collection.getProducingTransformInternal(), inputBundle, evaluationContext);
+        new ParDoSingleEvaluatorFactory()
+            .forApplication(
+                collection.getProducingTransformInternal(), inputBundle, evaluationContext);
 
     evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
     evaluator.processElement(
@@ -118,16 +125,20 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable {
 
     PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam"));
     final TupleTag<Integer> sideOutputTag = new TupleTag<Integer>() {};
-    PCollection<Integer> collection = input.apply(ParDo.of(new DoFn<String, Integer>() {
-      @Override public void processElement(ProcessContext c) {
-        c.sideOutput(sideOutputTag, c.element().length());
-      }
-    }));
-    CommittedBundle<String> inputBundle = InProcessBundle.unkeyed(input).commit(Instant.now());
+    PCollection<Integer> collection =
+        input.apply(
+            ParDo.of(
+                new DoFn<String, Integer>() {
+                  @Override
+                  public void processElement(ProcessContext c) {
+                    c.sideOutput(sideOutputTag, c.element().length());
+                  }
+                }));
+    CommittedBundle<String> inputBundle =
+        bundleFactory.createRootBundle(input).commit(Instant.now());
 
     InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class);
-    UncommittedBundle<Integer> outputBundle =
-        InProcessBundle.unkeyed(collection);
+    UncommittedBundle<Integer> outputBundle = bundleFactory.createRootBundle(collection);
     when(evaluationContext.createBundle(inputBundle, collection)).thenReturn(outputBundle);
     InProcessExecutionContext executionContext =
         new InProcessExecutionContext(null, null, null, null);
@@ -137,8 +148,9 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable {
     when(evaluationContext.createCounterSet()).thenReturn(counters);
 
     TransformEvaluator<String> evaluator =
-        new ParDoSingleEvaluatorFactory().forApplication(
-            collection.getProducingTransformInternal(), inputBundle, evaluationContext);
+        new ParDoSingleEvaluatorFactory()
+            .forApplication(
+                collection.getProducingTransformInternal(), inputBundle, evaluationContext);
 
     evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
     evaluator.processElement(
@@ -183,10 +195,12 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable {
             });
     PCollection<KV<String, Integer>> mainOutput = input.apply(pardo);
 
-    CommittedBundle<String> inputBundle = InProcessBundle.unkeyed(input).commit(Instant.now());
+    CommittedBundle<String> inputBundle =
+        bundleFactory.createRootBundle(input).commit(Instant.now());
 
     InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class);
-    UncommittedBundle<KV<String, Integer>> mainOutputBundle = InProcessBundle.unkeyed(mainOutput);
+    UncommittedBundle<KV<String, Integer>> mainOutputBundle =
+        bundleFactory.createRootBundle(mainOutput);
 
     when(evaluationContext.createBundle(inputBundle, mainOutput)).thenReturn(mainOutputBundle);
 
@@ -246,42 +260,44 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable {
 
     ParDo.Bound<String, KV<String, Integer>> pardo =
         ParDo.of(
-                new DoFn<String, KV<String, Integer>>() {
-                  @Override
-                  public void processElement(ProcessContext c) {
-                    c.windowingInternals().stateInternals();
-                    c.windowingInternals()
-                        .timerInternals()
-                        .setTimer(
-                            TimerData.of(
-                                StateNamespaces.window(
-                                    IntervalWindow.getCoder(),
-                                    new IntervalWindow(
-                                        new Instant(0).plus(Duration.standardMinutes(5)),
-                                        new Instant(1)
-                                            .plus(Duration.standardMinutes(5))
-                                            .plus(Duration.standardHours(1)))),
-                                new Instant(54541L),
-                                TimeDomain.EVENT_TIME));
-                    c.windowingInternals()
-                        .timerInternals()
-                        .deleteTimer(
-                            TimerData.of(
-                                StateNamespaces.window(
-                                    IntervalWindow.getCoder(),
-                                    new IntervalWindow(
-                                        new Instant(0),
-                                        new Instant(0).plus(Duration.standardHours(1)))),
-                                new Instant(3400000),
-                                TimeDomain.SYNCHRONIZED_PROCESSING_TIME));
-                  }
-                });
+            new DoFn<String, KV<String, Integer>>() {
+              @Override
+              public void processElement(ProcessContext c) {
+                c.windowingInternals().stateInternals();
+                c.windowingInternals()
+                    .timerInternals()
+                    .setTimer(
+                        TimerData.of(
+                            StateNamespaces.window(
+                                IntervalWindow.getCoder(),
+                                new IntervalWindow(
+                                    new Instant(0).plus(Duration.standardMinutes(5)),
+                                    new Instant(1)
+                                        .plus(Duration.standardMinutes(5))
+                                        .plus(Duration.standardHours(1)))),
+                            new Instant(54541L),
+                            TimeDomain.EVENT_TIME));
+                c.windowingInternals()
+                    .timerInternals()
+                    .deleteTimer(
+                        TimerData.of(
+                            StateNamespaces.window(
+                                IntervalWindow.getCoder(),
+                                new IntervalWindow(
+                                    new Instant(0),
+                                    new Instant(0).plus(Duration.standardHours(1)))),
+                            new Instant(3400000),
+                            TimeDomain.SYNCHRONIZED_PROCESSING_TIME));
+              }
+            });
     PCollection<KV<String, Integer>> mainOutput = input.apply(pardo);
 
-    CommittedBundle<String> inputBundle = InProcessBundle.unkeyed(input).commit(Instant.now());
+    CommittedBundle<String> inputBundle =
+        bundleFactory.createRootBundle(input).commit(Instant.now());
 
     InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class);
-    UncommittedBundle<KV<String, Integer>> mainOutputBundle = InProcessBundle.unkeyed(mainOutput);
+    UncommittedBundle<KV<String, Integer>> mainOutputBundle =
+        bundleFactory.createRootBundle(mainOutput);
 
     when(evaluationContext.createBundle(inputBundle, mainOutput)).thenReturn(mainOutputBundle);
 
@@ -303,10 +319,7 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable {
     assertThat(
         result.getTimerUpdate(),
         equalTo(
-            TimerUpdate.builder("myKey")
-                .setTimer(addedTimer)
-                .deletedTimer(deletedTimer)
-                .build()));
+            TimerUpdate.builder("myKey").setTimer(addedTimer).deletedTimer(deletedTimer).build()));
   }
 }