You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/07/20 17:09:50 UTC

[23/28] beam git commit: Revert "[BEAM-2610] This closes #3553"

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachineTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachineTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachineTest.java
index 45a5cfb..e4d10a0 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachineTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachineTest.java
@@ -25,6 +25,7 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnMergeContext;
+import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine;
 import org.apache.beam.runners.core.triggers.TriggerStateMachineTester.SimpleTriggerStateMachineTester;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
@@ -45,8 +46,8 @@ import org.mockito.MockitoAnnotations;
 @RunWith(JUnit4.class)
 public class AfterWatermarkStateMachineTest {
 
-  @Mock private TriggerStateMachine mockEarly;
-  @Mock private TriggerStateMachine mockLate;
+  @Mock private OnceTriggerStateMachine mockEarly;
+  @Mock private OnceTriggerStateMachine mockLate;
 
   private SimpleTriggerStateMachineTester<IntervalWindow> tester;
   private static TriggerStateMachine.TriggerContext anyTriggerContext() {
@@ -69,7 +70,7 @@ public class AfterWatermarkStateMachineTest {
     MockitoAnnotations.initMocks(this);
   }
 
-  public void testRunningAsTrigger(TriggerStateMachine mockTrigger, IntervalWindow window)
+  public void testRunningAsTrigger(OnceTriggerStateMachine mockTrigger, IntervalWindow window)
       throws Exception {
 
     // Don't fire due to mock saying no

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/StubTriggerStateMachine.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/StubTriggerStateMachine.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/StubTriggerStateMachine.java
index 1bc757e..4512848 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/StubTriggerStateMachine.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/StubTriggerStateMachine.java
@@ -18,11 +18,12 @@
 package org.apache.beam.runners.core.triggers;
 
 import com.google.common.collect.Lists;
+import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine;
 
 /**
- * No-op {@link TriggerStateMachine} implementation for testing.
+ * No-op {@link OnceTriggerStateMachine} implementation for testing.
  */
-abstract class StubTriggerStateMachine extends TriggerStateMachine {
+abstract class StubTriggerStateMachine extends OnceTriggerStateMachine {
   /**
    * Create a stub {@link TriggerStateMachine} instance which returns the specified name on {@link
    * #toString()}.
@@ -41,7 +42,7 @@ abstract class StubTriggerStateMachine extends TriggerStateMachine {
   }
 
   @Override
-  public void onFire(TriggerContext context) throws Exception {
+  protected void onOnlyFiring(TriggerContext context) throws Exception {
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/direct-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/direct-java/pom.xml b/runners/direct-java/pom.xml
index e14e813..bec2113 100644
--- a/runners/direct-java/pom.xml
+++ b/runners/direct-java/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-runners-parent</artifactId>
-    <version>2.2.0-SNAPSHOT</version>
+    <version>2.1.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 
@@ -117,7 +117,7 @@
                   </relocation>
                 </relocations>
                 <transformers>
-                  <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
+                  <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                 </transformers>
               </configuration>
             </execution>
@@ -155,8 +155,7 @@
               <systemPropertyVariables>
                 <beamTestPipelineOptions>
                   [
-                    "--runner=DirectRunner",
-                    "--runnerDeterminedSharding=false"
+                    "--runner=DirectRunner"
                   ]
                 </beamTestPipelineOptions>
               </systemPropertyVariables>

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java
index 70e3ac3..8c45449 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java
@@ -19,8 +19,8 @@
 package org.apache.beam.runners.direct;
 
 import com.google.auto.value.AutoValue;
-import com.google.common.base.Optional;
 import java.util.Set;
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
 
@@ -36,10 +36,12 @@ abstract class CommittedResult {
 
   /**
    * Returns the {@link CommittedBundle} that contains the input elements that could not be
-   * processed by the evaluation. The returned optional is present if there were any unprocessed
-   * input elements, and absent otherwise.
+   * processed by the evaluation.
+   *
+   * <p>{@code null} if the input bundle was null.
    */
-  public abstract Optional<? extends CommittedBundle<?>> getUnprocessedInputs();
+  @Nullable
+  public abstract CommittedBundle<?> getUnprocessedInputs();
 
   /**
    * Returns the outputs produced by the transform.
@@ -57,7 +59,7 @@ abstract class CommittedResult {
 
   public static CommittedResult create(
       TransformResult<?> original,
-      Optional<? extends CommittedBundle<?>> unprocessedElements,
+      CommittedBundle<?> unprocessedElements,
       Iterable<? extends CommittedBundle<?>> outputs,
       Set<OutputType> producedOutputs) {
     return new AutoValue_CommittedResult(original.getTransform(),

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraph.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraph.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraph.java
index ad17b2b..c2c0afa 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraph.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraph.java
@@ -17,8 +17,6 @@
  */
 package org.apache.beam.runners.direct;
 
-import static com.google.common.base.Preconditions.checkArgument;
-
 import com.google.common.collect.ListMultimap;
 import java.util.Collection;
 import java.util.List;
@@ -38,8 +36,7 @@ import org.apache.beam.sdk.values.PValue;
 class DirectGraph {
   private final Map<PCollection<?>, AppliedPTransform<?, ?, ?>> producers;
   private final Map<PCollectionView<?>, AppliedPTransform<?, ?, ?>> viewWriters;
-  private final ListMultimap<PInput, AppliedPTransform<?, ?, ?>> perElementConsumers;
-  private final ListMultimap<PValue, AppliedPTransform<?, ?, ?>> allConsumers;
+  private final ListMultimap<PInput, AppliedPTransform<?, ?, ?>> primitiveConsumers;
 
   private final Set<AppliedPTransform<?, ?, ?>> rootTransforms;
   private final Map<AppliedPTransform<?, ?, ?>, String> stepNames;
@@ -47,36 +44,23 @@ class DirectGraph {
   public static DirectGraph create(
       Map<PCollection<?>, AppliedPTransform<?, ?, ?>> producers,
       Map<PCollectionView<?>, AppliedPTransform<?, ?, ?>> viewWriters,
-      ListMultimap<PInput, AppliedPTransform<?, ?, ?>> perElementConsumers,
-      ListMultimap<PValue, AppliedPTransform<?, ?, ?>> allConsumers,
+      ListMultimap<PInput, AppliedPTransform<?, ?, ?>> primitiveConsumers,
       Set<AppliedPTransform<?, ?, ?>> rootTransforms,
       Map<AppliedPTransform<?, ?, ?>, String> stepNames) {
-    return new DirectGraph(
-        producers, viewWriters, perElementConsumers, allConsumers, rootTransforms, stepNames);
+    return new DirectGraph(producers, viewWriters, primitiveConsumers, rootTransforms, stepNames);
   }
 
   private DirectGraph(
       Map<PCollection<?>, AppliedPTransform<?, ?, ?>> producers,
       Map<PCollectionView<?>, AppliedPTransform<?, ?, ?>> viewWriters,
-      ListMultimap<PInput, AppliedPTransform<?, ?, ?>> perElementConsumers,
-      ListMultimap<PValue, AppliedPTransform<?, ?, ?>> allConsumers,
+      ListMultimap<PInput, AppliedPTransform<?, ?, ?>> primitiveConsumers,
       Set<AppliedPTransform<?, ?, ?>> rootTransforms,
       Map<AppliedPTransform<?, ?, ?>, String> stepNames) {
     this.producers = producers;
     this.viewWriters = viewWriters;
-    this.perElementConsumers = perElementConsumers;
-    this.allConsumers = allConsumers;
+    this.primitiveConsumers = primitiveConsumers;
     this.rootTransforms = rootTransforms;
     this.stepNames = stepNames;
-    for (AppliedPTransform<?, ?, ?> step : stepNames.keySet()) {
-      for (PValue input : step.getInputs().values()) {
-        checkArgument(
-            allConsumers.get(input).contains(step),
-            "Step %s lists value %s as input, but it is not in the graph of consumers",
-            step.getFullName(),
-            input);
-      }
-    }
   }
 
   AppliedPTransform<?, ?, ?> getProducer(PCollection<?> produced) {
@@ -87,22 +71,14 @@ class DirectGraph {
     return viewWriters.get(view);
   }
 
-  List<AppliedPTransform<?, ?, ?>> getPerElementConsumers(PValue consumed) {
-    return perElementConsumers.get(consumed);
-  }
-
-  List<AppliedPTransform<?, ?, ?>> getAllConsumers(PValue consumed) {
-    return allConsumers.get(consumed);
+  List<AppliedPTransform<?, ?, ?>> getPrimitiveConsumers(PValue consumed) {
+    return primitiveConsumers.get(consumed);
   }
 
   Set<AppliedPTransform<?, ?, ?>> getRootTransforms() {
     return rootTransforms;
   }
 
-  Set<PCollection<?>> getPCollections() {
-    return producers.keySet();
-  }
-
   Set<PCollectionView<?>> getViews() {
     return viewWriters.keySet();
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
index 675de2c..d54de5d 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
@@ -21,26 +21,19 @@ import static com.google.common.base.Preconditions.checkState;
 
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ListMultimap;
-import com.google.common.collect.Sets;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
-import org.apache.beam.runners.core.construction.TransformInputs;
-import org.apache.beam.runners.direct.ViewOverrideFactory.WriteView;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.Pipeline.PipelineVisitor;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.PValue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Tracks the {@link AppliedPTransform AppliedPTransforms} that consume each {@link PValue} in the
@@ -48,15 +41,11 @@ import org.slf4j.LoggerFactory;
  * input after the upstream transform has produced and committed output.
  */
 class DirectGraphVisitor extends PipelineVisitor.Defaults {
-  private static final Logger LOG = LoggerFactory.getLogger(DirectGraphVisitor.class);
 
   private Map<PCollection<?>, AppliedPTransform<?, ?, ?>> producers = new HashMap<>();
   private Map<PCollectionView<?>, AppliedPTransform<?, ?, ?>> viewWriters = new HashMap<>();
-  private Set<PCollectionView<?>> consumedViews = new HashSet<>();
 
-  private ListMultimap<PInput, AppliedPTransform<?, ?, ?>> perElementConsumers =
-      ArrayListMultimap.create();
-  private ListMultimap<PValue, AppliedPTransform<?, ?, ?>> allConsumers =
+  private ListMultimap<PInput, AppliedPTransform<?, ?, ?>> primitiveConsumers =
       ArrayListMultimap.create();
 
   private Set<AppliedPTransform<?, ?, ?>> rootTransforms = new HashSet<>();
@@ -84,13 +73,6 @@ class DirectGraphVisitor extends PipelineVisitor.Defaults {
         getClass().getSimpleName());
     if (node.isRootNode()) {
       finalized = true;
-      checkState(
-          viewWriters.keySet().containsAll(consumedViews),
-          "All %ss that are consumed must be written by some %s %s: Missing %s",
-          PCollectionView.class.getSimpleName(),
-          WriteView.class.getSimpleName(),
-          PTransform.class.getSimpleName(),
-          Sets.difference(consumedViews, viewWriters.keySet()));
     }
   }
 
@@ -101,30 +83,18 @@ class DirectGraphVisitor extends PipelineVisitor.Defaults {
     if (node.getInputs().isEmpty()) {
       rootTransforms.add(appliedTransform);
     } else {
-      Collection<PValue> mainInputs =
-          TransformInputs.nonAdditionalInputs(node.toAppliedPTransform(getPipeline()));
-      if (!mainInputs.containsAll(node.getInputs().values())) {
-        LOG.debug(
-            "Inputs reduced to {} from {} by removing additional inputs",
-            mainInputs,
-            node.getInputs().values());
-      }
-      for (PValue value : mainInputs) {
-        perElementConsumers.put(value, appliedTransform);
-      }
       for (PValue value : node.getInputs().values()) {
-        allConsumers.put(value, appliedTransform);
+        primitiveConsumers.put(value, appliedTransform);
+      }
+      if (node.getTransform() instanceof ViewOverrideFactory.WriteView) {
+        viewWriters.put(
+            ((ViewOverrideFactory.WriteView<?, ?>) node.getTransform()).getView(),
+            node.toAppliedPTransform(getPipeline()));
       }
-    }
-    if (node.getTransform() instanceof ParDo.MultiOutput) {
-      consumedViews.addAll(((ParDo.MultiOutput<?, ?>) node.getTransform()).getSideInputs());
-    } else if (node.getTransform() instanceof ViewOverrideFactory.WriteView) {
-      viewWriters.put(
-          ((WriteView) node.getTransform()).getView(), node.toAppliedPTransform(getPipeline()));
     }
   }
 
-  @Override
+ @Override
   public void visitValue(PValue value, TransformHierarchy.Node producer) {
     AppliedPTransform<?, ?, ?> appliedTransform = getAppliedTransform(producer);
     if (value instanceof PCollection && !producers.containsKey(value)) {
@@ -149,6 +119,6 @@ class DirectGraphVisitor extends PipelineVisitor.Defaults {
   public DirectGraph getGraph() {
     checkState(finalized, "Can't get a graph before the Pipeline has been completely traversed");
     return DirectGraph.create(
-        producers, viewWriters, perElementConsumers, allConsumers, rootTransforms, stepNames);
+        producers, viewWriters, primitiveConsumers, rootTransforms, stepNames);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
index 06b8e29..2fc0dd4 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
@@ -36,17 +36,13 @@ import org.apache.beam.sdk.values.WindowingStrategy;
 
 class DirectGroupByKey<K, V>
     extends ForwardingPTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> {
-  private final PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> original;
+  private final GroupByKey<K, V> original;
 
   static final String DIRECT_GBKO_URN = "urn:beam:directrunner:transforms:gbko:v1";
   static final String DIRECT_GABW_URN = "urn:beam:directrunner:transforms:gabw:v1";
-  private final WindowingStrategy<?, ?> outputWindowingStrategy;
 
-  DirectGroupByKey(
-      PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> original,
-      WindowingStrategy<?, ?> outputWindowingStrategy) {
-    this.original = original;
-    this.outputWindowingStrategy = outputWindowingStrategy;
+  DirectGroupByKey(GroupByKey<K, V> from) {
+    this.original = from;
   }
 
   @Override
@@ -61,6 +57,9 @@ class DirectGroupByKey<K, V>
     // key/value input elements and the window merge operation of the
     // window function associated with the input PCollection.
     WindowingStrategy<?, ?> inputWindowingStrategy = input.getWindowingStrategy();
+    // Update the windowing strategy as appropriate.
+    WindowingStrategy<?, ?> outputWindowingStrategy =
+        original.updateWindowingStrategy(inputWindowingStrategy);
 
     // By default, implement GroupByKey via a series of lower-level operations.
     return input

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java
index 9c2de3d..c2eb5e7 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java
@@ -17,34 +17,26 @@
  */
 package org.apache.beam.runners.direct;
 
-import com.google.common.collect.Iterables;
 import org.apache.beam.runners.core.construction.PTransformReplacements;
 import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory;
 import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 
 /** A {@link PTransformOverrideFactory} for {@link GroupByKey} PTransforms. */
 final class DirectGroupByKeyOverrideFactory<K, V>
     extends SingleInputOutputOverrideFactory<
-        PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>,
-        PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>>> {
+        PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>, GroupByKey<K, V>> {
   @Override
   public PTransformReplacement<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>>
       getReplacementTransform(
           AppliedPTransform<
-                  PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>,
-                  PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>>>
+                  PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>, GroupByKey<K, V>>
               transform) {
-
-    PCollection<KV<K, Iterable<V>>> output =
-        (PCollection<KV<K, Iterable<V>>>) Iterables.getOnlyElement(transform.getOutputs().values());
-
     return PTransformReplacement.of(
         PTransformReplacements.getSingletonMainInput(transform),
-        new DirectGroupByKey<>(transform.getTransform(), output.getWindowingStrategy()));
+        new DirectGroupByKey<>(transform.getTransform()));
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRegistrar.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRegistrar.java
index 53fb2f2..0e6fbab 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRegistrar.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRegistrar.java
@@ -50,7 +50,7 @@ public class DirectRegistrar {
     @Override
     public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
       return ImmutableList.<Class<? extends PipelineOptions>>of(
-          DirectOptions.class, DirectTestOptions.class);
+          DirectOptions.class);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index 4621224..dbd1ec4 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -30,7 +30,6 @@ import java.util.Map;
 import java.util.Set;
 import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems;
 import org.apache.beam.runners.core.construction.PTransformMatchers;
-import org.apache.beam.runners.core.construction.PTransformTranslation;
 import org.apache.beam.runners.core.construction.SplittableParDo;
 import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult;
 import org.apache.beam.runners.direct.TestStreamEvaluatorFactory.DirectTestStreamFactory;
@@ -38,11 +37,17 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.PipelineRunner;
+import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.metrics.MetricResults;
 import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.runners.PTransformOverride;
+import org.apache.beam.sdk.testing.TestStream;
+import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.ParDo.MultiOutput;
+import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.values.PCollection;
 import org.joda.time.Duration;
@@ -69,17 +74,16 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> {
     IMMUTABILITY {
       @Override
       public boolean appliesTo(PCollection<?> collection, DirectGraph graph) {
-        return CONTAINS_UDF.contains(
-            PTransformTranslation.urnForTransform(graph.getProducer(collection).getTransform()));
+        return CONTAINS_UDF.contains(graph.getProducer(collection).getTransform().getClass());
       }
     };
 
     /**
      * The set of {@link PTransform PTransforms} that execute a UDF. Useful for some enforcements.
      */
-    private static final Set<String> CONTAINS_UDF =
+    private static final Set<Class<? extends PTransform>> CONTAINS_UDF =
         ImmutableSet.of(
-            PTransformTranslation.READ_TRANSFORM_URN, PTransformTranslation.PAR_DO_TRANSFORM_URN);
+            Read.Bounded.class, Read.Unbounded.class, ParDo.SingleOutput.class, MultiOutput.class);
 
     public abstract boolean appliesTo(PCollection<?> collection, DirectGraph graph);
 
@@ -108,19 +112,22 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> {
       return bundleFactory;
     }
 
-    private static Map<String, Collection<ModelEnforcementFactory>>
+    @SuppressWarnings("rawtypes")
+    private static Map<Class<? extends PTransform>, Collection<ModelEnforcementFactory>>
         defaultModelEnforcements(Set<Enforcement> enabledEnforcements) {
-      ImmutableMap.Builder<String, Collection<ModelEnforcementFactory>> enforcements =
-          ImmutableMap.builder();
+      ImmutableMap.Builder<Class<? extends PTransform>, Collection<ModelEnforcementFactory>>
+          enforcements = ImmutableMap.builder();
       ImmutableList.Builder<ModelEnforcementFactory> enabledParDoEnforcements =
           ImmutableList.builder();
       if (enabledEnforcements.contains(Enforcement.IMMUTABILITY)) {
         enabledParDoEnforcements.add(ImmutabilityEnforcementFactory.create());
       }
       Collection<ModelEnforcementFactory> parDoEnforcements = enabledParDoEnforcements.build();
-      enforcements.put(PTransformTranslation.PAR_DO_TRANSFORM_URN, parDoEnforcements);
+      enforcements.put(ParDo.SingleOutput.class, parDoEnforcements);
+      enforcements.put(MultiOutput.class, parDoEnforcements);
       return enforcements.build();
     }
+
   }
 
   ////////////////////////////////////////////////////////////////////////////////////////////////
@@ -216,45 +223,42 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> {
   @SuppressWarnings("rawtypes")
   @VisibleForTesting
   List<PTransformOverride> defaultTransformOverrides() {
-    DirectTestOptions testOptions = options.as(DirectTestOptions.class);
-    ImmutableList.Builder<PTransformOverride> builder = ImmutableList.builder();
-    if (testOptions.isRunnerDeterminedSharding()) {
-      builder.add(
-          PTransformOverride.of(
-              PTransformMatchers.writeWithRunnerDeterminedSharding(),
-              new WriteWithShardingFactory())); /* Uses a view internally. */
-    }
-    builder = builder.add(
-        PTransformOverride.of(
-            PTransformMatchers.urnEqualTo(PTransformTranslation.CREATE_VIEW_TRANSFORM_URN),
-            new ViewOverrideFactory())) /* Uses pardos and GBKs */
+    return ImmutableList.<PTransformOverride>builder()
+        .add(
+            PTransformOverride.of(
+                PTransformMatchers.writeWithRunnerDeterminedSharding(),
+                new WriteWithShardingFactory())) /* Uses a view internally. */
+        .add(
+            PTransformOverride.of(
+                PTransformMatchers.classEqualTo(CreatePCollectionView.class),
+                new ViewOverrideFactory())) /* Uses pardos and GBKs */
         .add(
             PTransformOverride.of(
-                PTransformMatchers.urnEqualTo(PTransformTranslation.TEST_STREAM_TRANSFORM_URN),
+                PTransformMatchers.classEqualTo(TestStream.class),
                 new DirectTestStreamFactory(this))) /* primitive */
         // SplittableParMultiDo is implemented in terms of nonsplittable simple ParDos and extra
         // primitives
         .add(
             PTransformOverride.of(
-                PTransformMatchers.splittableParDo(), new ParDoMultiOverrideFactory()))
+                PTransformMatchers.splittableParDoMulti(), new ParDoMultiOverrideFactory()))
         // state and timer pardos are implemented in terms of simple ParDos and extra primitives
         .add(
             PTransformOverride.of(
-                PTransformMatchers.stateOrTimerParDo(), new ParDoMultiOverrideFactory()))
+                PTransformMatchers.stateOrTimerParDoMulti(), new ParDoMultiOverrideFactory()))
         .add(
             PTransformOverride.of(
-                PTransformMatchers.urnEqualTo(
-                    SplittableParDo.SPLITTABLE_PROCESS_KEYED_ELEMENTS_URN),
+                PTransformMatchers.classEqualTo(SplittableParDo.ProcessKeyedElements.class),
                 new SplittableParDoViaKeyedWorkItems.OverrideFactory()))
         .add(
             PTransformOverride.of(
-                PTransformMatchers.urnEqualTo(SplittableParDo.SPLITTABLE_GBKIKWI_URN),
+                PTransformMatchers.classEqualTo(
+                    SplittableParDoViaKeyedWorkItems.GBKIntoKeyedWorkItems.class),
                 new DirectGBKIntoKeyedWorkItemsOverrideFactory())) /* Returns a GBKO */
         .add(
             PTransformOverride.of(
-    PTransformMatchers.urnEqualTo(PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN),
-                new DirectGroupByKeyOverrideFactory())); /* returns two chained primitives. */
-    return builder.build();
+                PTransformMatchers.classEqualTo(GroupByKey.class),
+                new DirectGroupByKeyOverrideFactory())) /* returns two chained primitives. */
+        .build();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTestOptions.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTestOptions.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTestOptions.java
deleted file mode 100644
index a426443..0000000
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTestOptions.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct;
-
-import org.apache.beam.sdk.annotations.Internal;
-import org.apache.beam.sdk.options.ApplicationNameOptions;
-import org.apache.beam.sdk.options.Default;
-import org.apache.beam.sdk.options.Description;
-import org.apache.beam.sdk.options.Hidden;
-import org.apache.beam.sdk.options.PipelineOptions;
-
-/**
- * Internal-only options for tweaking the behavior of the {@link DirectRunner} in ways that users
- * should never do.
- *
- * <p>Currently, the only use is to disable user-friendly overrides that prevent fully testing
- * certain composite transforms.
- */
-@Internal
-@Hidden
-public interface DirectTestOptions extends PipelineOptions, ApplicationNameOptions {
-  @Default.Boolean(true)
-  @Description(
-      "Indicates whether this is an automatically-run unit test.")
-  boolean isRunnerDeterminedSharding();
-  void setRunnerDeterminedSharding(boolean goAheadAndDetermineSharding);
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
index d192785..e215070 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
@@ -20,7 +20,6 @@ package org.apache.beam.runners.direct;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.util.concurrent.MoreExecutors;
@@ -159,9 +158,12 @@ class EvaluationContext {
     } else {
       outputTypes.add(OutputType.BUNDLE);
     }
-    CommittedResult committedResult =
-        CommittedResult.create(
-            result, getUnprocessedInput(completedBundle, result), committedBundles, outputTypes);
+    CommittedResult committedResult = CommittedResult.create(result,
+        completedBundle == null
+            ? null
+            : completedBundle.withElements((Iterable) result.getUnprocessedElements()),
+        committedBundles,
+        outputTypes);
     // Update state internals
     CopyOnAccessInMemoryStateInternals theirState = result.getState();
     if (theirState != null) {
@@ -185,22 +187,6 @@ class EvaluationContext {
     return committedResult;
   }
 
-  /**
-   * Returns an {@link Optional} containing a bundle which contains all of the unprocessed elements
-   * that were not processed from the {@code completedBundle}. If all of the elements of the {@code
-   * completedBundle} were processed, or if {@code completedBundle} is null, returns an absent
-   * {@link Optional}.
-   */
-  private Optional<? extends CommittedBundle<?>> getUnprocessedInput(
-      @Nullable CommittedBundle<?> completedBundle, TransformResult<?> result) {
-    if (completedBundle == null || Iterables.isEmpty(result.getUnprocessedElements())) {
-      return Optional.absent();
-    }
-    CommittedBundle<?> residual =
-        completedBundle.withElements((Iterable) result.getUnprocessedElements());
-    return Optional.of(residual);
-  }
-
   private Iterable<? extends CommittedBundle<?>> commitBundles(
       Iterable<? extends UncommittedBundle<?>> bundles) {
     ImmutableList.Builder<CommittedBundle<?>> completed = ImmutableList.builder();

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index 75e2562..71ab4cc 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -49,11 +49,11 @@ import javax.annotation.Nullable;
 import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.KeyedWorkItems;
 import org.apache.beam.runners.core.TimerInternals.TimerData;
-import org.apache.beam.runners.core.construction.PTransformTranslation;
 import org.apache.beam.runners.direct.WatermarkManager.FiredTimers;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult.State;
 import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
@@ -77,7 +77,9 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
   private final DirectGraph graph;
   private final RootProviderRegistry rootProviderRegistry;
   private final TransformEvaluatorRegistry registry;
-  private final Map<String, Collection<ModelEnforcementFactory>> transformEnforcements;
+  @SuppressWarnings("rawtypes")
+  private final Map<Class<? extends PTransform>, Collection<ModelEnforcementFactory>>
+      transformEnforcements;
 
   private final EvaluationContext evaluationContext;
 
@@ -110,7 +112,9 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
       DirectGraph graph,
       RootProviderRegistry rootProviderRegistry,
       TransformEvaluatorRegistry registry,
-      Map<String, Collection<ModelEnforcementFactory>> transformEnforcements,
+      @SuppressWarnings("rawtypes")
+          Map<Class<? extends PTransform>, Collection<ModelEnforcementFactory>>
+              transformEnforcements,
       EvaluationContext context) {
     return new ExecutorServiceParallelExecutor(
         targetParallelism,
@@ -126,7 +130,8 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
       DirectGraph graph,
       RootProviderRegistry rootProviderRegistry,
       TransformEvaluatorRegistry registry,
-      Map<String, Collection<ModelEnforcementFactory>> transformEnforcements,
+      @SuppressWarnings("rawtypes")
+      Map<Class<? extends PTransform>, Collection<ModelEnforcementFactory>> transformEnforcements,
       EvaluationContext context) {
     this.targetParallelism = targetParallelism;
     // Don't use Daemon threads for workers. The Pipeline should continue to execute even if there
@@ -232,8 +237,7 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
 
     Collection<ModelEnforcementFactory> enforcements =
         MoreObjects.firstNonNull(
-            transformEnforcements.get(
-                PTransformTranslation.urnForTransform(transform.getTransform())),
+            transformEnforcements.get(transform.getTransform().getClass()),
             Collections.<ModelEnforcementFactory>emptyList());
 
     TransformExecutor<T> callable =
@@ -351,18 +355,17 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
       for (CommittedBundle<?> outputBundle : committedResult.getOutputs()) {
         allUpdates.offer(
             ExecutorUpdate.fromBundle(
-                outputBundle, graph.getPerElementConsumers(outputBundle.getPCollection())));
+                outputBundle, graph.getPrimitiveConsumers(outputBundle.getPCollection())));
       }
-      Optional<? extends CommittedBundle<?>> unprocessedInputs =
-          committedResult.getUnprocessedInputs();
-      if (unprocessedInputs.isPresent()) {
+      CommittedBundle<?> unprocessedInputs = committedResult.getUnprocessedInputs();
+      if (unprocessedInputs != null && !Iterables.isEmpty(unprocessedInputs.getElements())) {
         if (inputBundle.getPCollection() == null) {
           // TODO: Split this logic out of an if statement
-          pendingRootBundles.get(result.getTransform()).offer(unprocessedInputs.get());
+          pendingRootBundles.get(result.getTransform()).offer(unprocessedInputs);
         } else {
           allUpdates.offer(
               ExecutorUpdate.fromBundle(
-                  unprocessedInputs.get(),
+                  unprocessedInputs,
                   Collections.<AppliedPTransform<?, ?, ?>>singleton(
                       committedResult.getTransform())));
         }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
index 516f798..8aa75cf 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.direct;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
+import com.google.common.collect.Iterables;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -78,7 +79,6 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator
         (TransformEvaluator<T>)
             createEvaluator(
                 (AppliedPTransform) application,
-                (PCollection<InputT>) inputBundle.getPCollection(),
                 inputBundle.getKey(),
                 doFn,
                 transform.getSideInputs(),
@@ -102,7 +102,6 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator
   @SuppressWarnings({"unchecked", "rawtypes"})
   DoFnLifecycleManagerRemovingTransformEvaluator<InputT> createEvaluator(
       AppliedPTransform<PCollection<InputT>, PCollectionTuple, ?> application,
-      PCollection<InputT> mainInput,
       StructuralKey<?> inputBundleKey,
       DoFn<InputT, OutputT> doFn,
       List<PCollectionView<?>> sideInputs,
@@ -121,7 +120,6 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator
         createParDoEvaluator(
             application,
             inputBundleKey,
-            mainInput,
             sideInputs,
             mainOutputTag,
             additionalOutputTags,
@@ -134,7 +132,6 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator
   ParDoEvaluator<InputT> createParDoEvaluator(
       AppliedPTransform<PCollection<InputT>, PCollectionTuple, ?> application,
       StructuralKey<?> key,
-      PCollection<InputT> mainInput,
       List<PCollectionView<?>> sideInputs,
       TupleTag<OutputT> mainOutputTag,
       List<TupleTag<?>> additionalOutputTags,
@@ -147,7 +144,8 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator
           evaluationContext,
           stepContext,
           application,
-          mainInput.getWindowingStrategy(),
+          ((PCollection<InputT>) Iterables.getOnlyElement(application.getInputs().values()))
+              .getWindowingStrategy(),
           fn,
           key,
           sideInputs,
@@ -175,4 +173,5 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator
     }
     return pcs;
   }
+
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
index 891d102..858ea34 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
@@ -19,17 +19,15 @@ package org.apache.beam.runners.direct;
 
 import static com.google.common.base.Preconditions.checkState;
 
-import java.io.IOException;
-import java.util.List;
 import java.util.Map;
 import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.KeyedWorkItemCoder;
 import org.apache.beam.runners.core.KeyedWorkItems;
 import org.apache.beam.runners.core.construction.PTransformReplacements;
 import org.apache.beam.runners.core.construction.PTransformTranslation;
-import org.apache.beam.runners.core.construction.ParDoTranslation;
 import org.apache.beam.runners.core.construction.ReplacementOutputs;
 import org.apache.beam.runners.core.construction.SplittableParDo;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.runners.AppliedPTransform;
@@ -38,6 +36,7 @@ import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.ParDo.MultiOutput;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.transforms.windowing.AfterPane;
@@ -49,8 +48,6 @@ import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PCollectionViews;
 import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
@@ -63,48 +60,36 @@ import org.apache.beam.sdk.values.WindowingStrategy;
  */
 class ParDoMultiOverrideFactory<InputT, OutputT>
     implements PTransformOverrideFactory<
-        PCollection<? extends InputT>, PCollectionTuple,
-        PTransform<PCollection<? extends InputT>, PCollectionTuple>> {
+        PCollection<? extends InputT>, PCollectionTuple, MultiOutput<InputT, OutputT>> {
   @Override
   public PTransformReplacement<PCollection<? extends InputT>, PCollectionTuple>
       getReplacementTransform(
           AppliedPTransform<
-                  PCollection<? extends InputT>, PCollectionTuple,
-                  PTransform<PCollection<? extends InputT>, PCollectionTuple>>
-              application) {
-
-    try {
-      return PTransformReplacement.of(
-          PTransformReplacements.getSingletonMainInput(application),
-          getReplacementForApplication(application));
-    } catch (IOException exc) {
-      throw new RuntimeException(exc);
-    }
+                  PCollection<? extends InputT>, PCollectionTuple, MultiOutput<InputT, OutputT>>
+              transform) {
+    return PTransformReplacement.of(
+        PTransformReplacements.getSingletonMainInput(transform),
+        getReplacementTransform(transform.getTransform()));
   }
 
   @SuppressWarnings("unchecked")
-  private PTransform<PCollection<? extends InputT>, PCollectionTuple> getReplacementForApplication(
-      AppliedPTransform<
-              PCollection<? extends InputT>, PCollectionTuple,
-              PTransform<PCollection<? extends InputT>, PCollectionTuple>>
-          application)
-      throws IOException {
-
-    DoFn<InputT, OutputT> fn = (DoFn<InputT, OutputT>) ParDoTranslation.getDoFn(application);
+  private PTransform<PCollection<? extends InputT>, PCollectionTuple> getReplacementTransform(
+      MultiOutput<InputT, OutputT> transform) {
 
+    DoFn<InputT, OutputT> fn = transform.getFn();
     DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass());
-
     if (signature.processElement().isSplittable()) {
-      return (PTransform) SplittableParDo.forAppliedParDo(application);
+      return new SplittableParDo(transform);
     } else if (signature.stateDeclarations().size() > 0
         || signature.timerDeclarations().size() > 0) {
-      return new GbkThenStatefulParDo(
-          fn,
-          ParDoTranslation.getMainOutputTag(application),
-          ParDoTranslation.getAdditionalOutputTags(application),
-          ParDoTranslation.getSideInputs(application));
+      // Based on the fact that the signature is stateful, DoFnSignatures ensures
+      // that it is also keyed
+      MultiOutput<KV<?, ?>, OutputT> keyedTransform =
+          (MultiOutput<KV<?, ?>, OutputT>) transform;
+
+      return new GbkThenStatefulParDo(keyedTransform);
     } else {
-      return application.getTransform();
+      return transform;
     }
   }
 
@@ -116,25 +101,10 @@ class ParDoMultiOverrideFactory<InputT, OutputT>
 
   static class GbkThenStatefulParDo<K, InputT, OutputT>
       extends PTransform<PCollection<KV<K, InputT>>, PCollectionTuple> {
-    private final transient DoFn<KV<K, InputT>, OutputT> doFn;
-    private final TupleTagList additionalOutputTags;
-    private final TupleTag<OutputT> mainOutputTag;
-    private final List<PCollectionView<?>> sideInputs;
-
-    public GbkThenStatefulParDo(
-        DoFn<KV<K, InputT>, OutputT> doFn,
-        TupleTag<OutputT> mainOutputTag,
-        TupleTagList additionalOutputTags,
-        List<PCollectionView<?>> sideInputs) {
-      this.doFn = doFn;
-      this.additionalOutputTags = additionalOutputTags;
-      this.mainOutputTag = mainOutputTag;
-      this.sideInputs = sideInputs;
-    }
+    private final MultiOutput<KV<K, InputT>, OutputT> underlyingParDo;
 
-    @Override
-    public Map<TupleTag<?>, PValue> getAdditionalInputs() {
-      return PCollectionViews.toAdditionalInputs(sideInputs);
+    public GbkThenStatefulParDo(MultiOutput<KV<K, InputT>, OutputT> underlyingParDo) {
+      this.underlyingParDo = underlyingParDo;
     }
 
     @Override
@@ -190,9 +160,7 @@ class ParDoMultiOverrideFactory<InputT, OutputT>
           adjustedInput
               // Explode the resulting iterable into elements that are exactly the ones from
               // the input
-              .apply(
-              "Stateful ParDo",
-              new StatefulParDo<>(doFn, mainOutputTag, additionalOutputTags, sideInputs));
+              .apply("Stateful ParDo", new StatefulParDo<>(underlyingParDo, input));
 
       return outputs;
     }
@@ -204,41 +172,25 @@ class ParDoMultiOverrideFactory<InputT, OutputT>
   static class StatefulParDo<K, InputT, OutputT>
       extends PTransformTranslation.RawPTransform<
           PCollection<? extends KeyedWorkItem<K, KV<K, InputT>>>, PCollectionTuple> {
-    private final transient DoFn<KV<K, InputT>, OutputT> doFn;
-    private final TupleTagList additionalOutputTags;
-    private final TupleTag<OutputT> mainOutputTag;
-    private final List<PCollectionView<?>> sideInputs;
+    private final transient MultiOutput<KV<K, InputT>, OutputT> underlyingParDo;
+    private final transient PCollection<KV<K, InputT>> originalInput;
 
     public StatefulParDo(
-        DoFn<KV<K, InputT>, OutputT> doFn,
-        TupleTag<OutputT> mainOutputTag,
-        TupleTagList additionalOutputTags,
-        List<PCollectionView<?>> sideInputs) {
-      this.doFn = doFn;
-      this.mainOutputTag = mainOutputTag;
-      this.additionalOutputTags = additionalOutputTags;
-      this.sideInputs = sideInputs;
-    }
-
-    public DoFn<KV<K, InputT>, OutputT> getDoFn() {
-      return doFn;
-    }
-
-    public TupleTag<OutputT> getMainOutputTag() {
-      return mainOutputTag;
-    }
-
-    public List<PCollectionView<?>> getSideInputs() {
-      return sideInputs;
+        MultiOutput<KV<K, InputT>, OutputT> underlyingParDo,
+        PCollection<KV<K, InputT>> originalInput) {
+      this.underlyingParDo = underlyingParDo;
+      this.originalInput = originalInput;
     }
 
-    public TupleTagList getAdditionalOutputTags() {
-      return additionalOutputTags;
+    public MultiOutput<KV<K, InputT>, OutputT> getUnderlyingParDo() {
+      return underlyingParDo;
     }
 
     @Override
-    public Map<TupleTag<?>, PValue> getAdditionalInputs() {
-      return PCollectionViews.toAdditionalInputs(sideInputs);
+    public <T> Coder<T> getDefaultOutputCoder(
+        PCollection<? extends KeyedWorkItem<K, KV<K, InputT>>> input, PCollection<T> output)
+        throws CannotProvideCoderException {
+      return underlyingParDo.getDefaultOutputCoder(originalInput, output);
     }
 
     @Override
@@ -247,7 +199,8 @@ class ParDoMultiOverrideFactory<InputT, OutputT>
       PCollectionTuple outputs =
           PCollectionTuple.ofPrimitiveOutputsInternal(
               input.getPipeline(),
-              TupleTagList.of(getMainOutputTag()).and(getAdditionalOutputTags().getAll()),
+              TupleTagList.of(underlyingParDo.getMainOutputTag())
+                  .and(underlyingParDo.getAdditionalOutputTags().getAll()),
               input.getWindowingStrategy(),
               input.isBounded());
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
index e6b51b7..b85f481c 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
@@ -35,6 +35,7 @@ import org.apache.beam.runners.core.StateInternals;
 import org.apache.beam.runners.core.StateInternalsFactory;
 import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.core.TimerInternalsFactory;
+import org.apache.beam.runners.core.construction.ElementAndRestriction;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -42,7 +43,6 @@ import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.PCollectionView;
@@ -54,7 +54,8 @@ import org.joda.time.Instant;
 class SplittableProcessElementsEvaluatorFactory<
         InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT>>
     implements TransformEvaluatorFactory {
-  private final ParDoEvaluatorFactory<KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT>
+  private final ParDoEvaluatorFactory<
+          KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT>
       delegateFactory;
   private final EvaluationContext evaluationContext;
 
@@ -83,13 +84,14 @@ class SplittableProcessElementsEvaluatorFactory<
   }
 
   @SuppressWarnings({"unchecked", "rawtypes"})
-  private TransformEvaluator<KeyedWorkItem<String, KV<InputT, RestrictionT>>> createEvaluator(
-      AppliedPTransform<
-              PCollection<KeyedWorkItem<String, KV<InputT, RestrictionT>>>, PCollectionTuple,
-              ProcessElements<InputT, OutputT, RestrictionT, TrackerT>>
-          application,
-      CommittedBundle<InputT> inputBundle)
-      throws Exception {
+  private TransformEvaluator<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>
+      createEvaluator(
+          AppliedPTransform<
+                  PCollection<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>,
+                  PCollectionTuple, ProcessElements<InputT, OutputT, RestrictionT, TrackerT>>
+              application,
+          CommittedBundle<InputT> inputBundle)
+          throws Exception {
     final ProcessElements<InputT, OutputT, RestrictionT, TrackerT> transform =
         application.getTransform();
 
@@ -99,7 +101,9 @@ class SplittableProcessElementsEvaluatorFactory<
     DoFnLifecycleManager fnManager = DoFnLifecycleManager.of(processFn);
     processFn =
         ((ProcessFn<InputT, OutputT, RestrictionT, TrackerT>)
-            fnManager.<KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT>get());
+            fnManager
+                .<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT>
+                    get());
 
     String stepName = evaluationContext.getStepName(application);
     final DirectExecutionContext.DirectStepContext stepContext =
@@ -107,13 +111,11 @@ class SplittableProcessElementsEvaluatorFactory<
             .getExecutionContext(application, inputBundle.getKey())
             .getStepContext(stepName);
 
-    final ParDoEvaluator<KeyedWorkItem<String, KV<InputT, RestrictionT>>>
+    final ParDoEvaluator<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>
         parDoEvaluator =
             delegateFactory.createParDoEvaluator(
                 application,
                 inputBundle.getKey(),
-                (PCollection<KeyedWorkItem<String, KV<InputT, RestrictionT>>>)
-                    inputBundle.getPCollection(),
                 transform.getSideInputs(),
                 transform.getMainOutputTag(),
                 transform.getAdditionalOutputTags().getAll(),
@@ -185,16 +187,17 @@ class SplittableProcessElementsEvaluatorFactory<
   }
 
   private static <InputT, OutputT, RestrictionT>
-      ParDoEvaluator.DoFnRunnerFactory<KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT>
+  ParDoEvaluator.DoFnRunnerFactory<
+                KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT>
           processFnRunnerFactory() {
     return new ParDoEvaluator.DoFnRunnerFactory<
-        KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT>() {
+            KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT>() {
       @Override
       public PushbackSideInputDoFnRunner<
-          KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT>
+          KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT>
       createRunner(
           PipelineOptions options,
-          DoFn<KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT> fn,
+          DoFn<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT> fn,
           List<PCollectionView<?>> sideInputs,
           ReadyCheckingSideInputReader sideInputReader,
           OutputManager outputManager,

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
index bdec9c8..506c84c 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
@@ -98,7 +98,7 @@ final class StatefulParDoEvaluatorFactory<K, InputT, OutputT> implements Transfo
       throws Exception {
 
     final DoFn<KV<K, InputT>, OutputT> doFn =
-        application.getTransform().getDoFn();
+        application.getTransform().getUnderlyingParDo().getFn();
     final DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
 
     // If the DoFn is stateful, schedule state clearing.
@@ -117,12 +117,11 @@ final class StatefulParDoEvaluatorFactory<K, InputT, OutputT> implements Transfo
     DoFnLifecycleManagerRemovingTransformEvaluator<KV<K, InputT>> delegateEvaluator =
         delegateFactory.createEvaluator(
             (AppliedPTransform) application,
-            (PCollection) inputBundle.getPCollection(),
             inputBundle.getKey(),
             doFn,
-            application.getTransform().getSideInputs(),
-            application.getTransform().getMainOutputTag(),
-            application.getTransform().getAdditionalOutputTags().getAll());
+            application.getTransform().getUnderlyingParDo().getSideInputs(),
+            application.getTransform().getUnderlyingParDo().getMainOutputTag(),
+            application.getTransform().getUnderlyingParDo().getAdditionalOutputTags().getAll());
 
     return new StatefulParDoEvaluator<>(delegateEvaluator);
   }
@@ -152,11 +151,12 @@ final class StatefulParDoEvaluatorFactory<K, InputT, OutputT> implements Transfo
                   transformOutputWindow
                       .getTransform()
                       .getTransform()
+                      .getUnderlyingParDo()
                       .getMainOutputTag());
       WindowingStrategy<?, ?> windowingStrategy = pc.getWindowingStrategy();
       BoundedWindow window = transformOutputWindow.getWindow();
       final DoFn<?, ?> doFn =
-          transformOutputWindow.getTransform().getTransform().getDoFn();
+          transformOutputWindow.getTransform().getTransform().getUnderlyingParDo().getFn();
       final DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
 
       final DirectStepContext stepContext =

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
index 16c8589..2da7a71 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
@@ -22,7 +22,6 @@ import com.google.auto.value.AutoValue;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Supplier;
 import com.google.common.collect.Iterables;
-import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
@@ -31,7 +30,6 @@ import java.util.concurrent.atomic.AtomicReference;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.core.construction.PTransformTranslation;
 import org.apache.beam.runners.core.construction.ReplacementOutputs;
-import org.apache.beam.runners.core.construction.TestStreamTranslation;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory;
 import org.apache.beam.sdk.testing.TestStream;
@@ -162,8 +160,7 @@ class TestStreamEvaluatorFactory implements TransformEvaluatorFactory {
   }
 
   static class DirectTestStreamFactory<T>
-      implements PTransformOverrideFactory<
-          PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>> {
+      implements PTransformOverrideFactory<PBegin, PCollection<T>, TestStream<T>> {
     private final DirectRunner runner;
 
     DirectTestStreamFactory(DirectRunner runner) {
@@ -172,17 +169,10 @@ class TestStreamEvaluatorFactory implements TransformEvaluatorFactory {
 
     @Override
     public PTransformReplacement<PBegin, PCollection<T>> getReplacementTransform(
-        AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>> transform) {
-      try {
-        return PTransformReplacement.of(
-            transform.getPipeline().begin(),
-            new DirectTestStream<T>(runner, TestStreamTranslation.getTestStream(transform)));
-      } catch (IOException exc) {
-        throw new RuntimeException(
-            String.format(
-                "Transform could not be converted to %s", TestStream.class.getSimpleName()),
-            exc);
-      }
+        AppliedPTransform<PBegin, PCollection<T>, TestStream<T>> transform) {
+      return PTransformReplacement.of(
+          transform.getPipeline().begin(),
+          new DirectTestStream<T>(runner, transform.getTransform()));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
index 8a281a7..057f4a1 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
@@ -28,6 +28,7 @@ import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
 
 /**
  * The {@link DirectRunner} {@link TransformEvaluatorFactory} for the {@link CreatePCollectionView}
@@ -59,13 +60,12 @@ class ViewEvaluatorFactory implements TransformEvaluatorFactory {
   public void cleanup() throws Exception {}
 
   private <InT, OuT> TransformEvaluator<Iterable<InT>> createEvaluator(
-      final AppliedPTransform<
-              PCollection<Iterable<InT>>, PCollection<Iterable<InT>>, WriteView<InT, OuT>>
+      final AppliedPTransform<PCollection<Iterable<InT>>, PCollectionView<OuT>, WriteView<InT, OuT>>
           application) {
     PCollection<Iterable<InT>> input =
         (PCollection<Iterable<InT>>) Iterables.getOnlyElement(application.getInputs().values());
-    final PCollectionViewWriter<InT, OuT> writer =
-        context.createPCollectionViewWriter(input, application.getTransform().getView());
+    final PCollectionViewWriter<InT, OuT> writer = context.createPCollectionViewWriter(input,
+        (PCollectionView<OuT>) Iterables.getOnlyElement(application.getOutputs().values()));
     return new TransformEvaluator<Iterable<InT>>() {
       private final List<WindowedValue<InT>> elements = new ArrayList<>();
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
index 5dcf016..fdff63d 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
@@ -18,12 +18,11 @@
 
 package org.apache.beam.runners.direct;
 
-import java.io.IOException;
+import java.util.Collections;
 import java.util.Map;
-import org.apache.beam.runners.core.construction.CreatePCollectionViewTranslation;
+import org.apache.beam.runners.core.construction.ForwardingPTransform;
 import org.apache.beam.runners.core.construction.PTransformReplacements;
 import org.apache.beam.runners.core.construction.PTransformTranslation.RawPTransform;
-import org.apache.beam.runners.core.construction.ReplacementOutputs;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.runners.AppliedPTransform;
@@ -44,56 +43,46 @@ import org.apache.beam.sdk.values.TupleTag;
  */
 class ViewOverrideFactory<ElemT, ViewT>
     implements PTransformOverrideFactory<
-    PCollection<ElemT>, PCollection<ElemT>,
-        PTransform<PCollection<ElemT>, PCollection<ElemT>>> {
+        PCollection<ElemT>, PCollectionView<ViewT>, CreatePCollectionView<ElemT, ViewT>> {
 
   @Override
-  public PTransformReplacement<PCollection<ElemT>, PCollection<ElemT>> getReplacementTransform(
+  public PTransformReplacement<PCollection<ElemT>, PCollectionView<ViewT>> getReplacementTransform(
       AppliedPTransform<
-              PCollection<ElemT>, PCollection<ElemT>,
-              PTransform<PCollection<ElemT>, PCollection<ElemT>>>
+              PCollection<ElemT>, PCollectionView<ViewT>, CreatePCollectionView<ElemT, ViewT>>
           transform) {
-
-    PCollectionView<ViewT> view;
-    try {
-      view = CreatePCollectionViewTranslation.getView(transform);
-    } catch (IOException exc) {
-      throw new RuntimeException(
-          String.format(
-              "Could not extract %s from transform %s",
-              PCollectionView.class.getSimpleName(), transform),
-          exc);
-    }
-
-      return PTransformReplacement.of(
+    return PTransformReplacement.of(
         PTransformReplacements.getSingletonMainInput(transform),
-        new GroupAndWriteView<ElemT, ViewT>(view));
+        new GroupAndWriteView<>(transform.getTransform()));
   }
 
   @Override
   public Map<PValue, ReplacementOutput> mapOutputs(
-      Map<TupleTag<?>, PValue> outputs, PCollection<ElemT> newOutput) {
-    return ReplacementOutputs.singleton(outputs, newOutput);
+      Map<TupleTag<?>, PValue> outputs, PCollectionView<ViewT> newOutput) {
+    return Collections.emptyMap();
   }
 
   /** The {@link DirectRunner} composite override for {@link CreatePCollectionView}. */
   static class GroupAndWriteView<ElemT, ViewT>
-      extends PTransform<PCollection<ElemT>, PCollection<ElemT>> {
-    private final PCollectionView<ViewT> view;
+      extends ForwardingPTransform<PCollection<ElemT>, PCollectionView<ViewT>> {
+    private final CreatePCollectionView<ElemT, ViewT> og;
 
-    private GroupAndWriteView(PCollectionView<ViewT> view) {
-      this.view = view;
+    private GroupAndWriteView(CreatePCollectionView<ElemT, ViewT> og) {
+      this.og = og;
     }
 
     @Override
-    public PCollection<ElemT> expand(final PCollection<ElemT> input) {
-      input
+    public PCollectionView<ViewT> expand(PCollection<ElemT> input) {
+      return input
           .apply(WithKeys.<Void, ElemT>of((Void) null))
           .setCoder(KvCoder.of(VoidCoder.of(), input.getCoder()))
           .apply(GroupByKey.<Void, ElemT>create())
           .apply(Values.<Iterable<ElemT>>create())
-          .apply(new WriteView<ElemT, ViewT>(view));
-      return input;
+          .apply(new WriteView<ElemT, ViewT>(og));
+    }
+
+    @Override
+    protected PTransform<PCollection<ElemT>, PCollectionView<ViewT>> delegate() {
+      return og;
     }
   }
 
@@ -105,24 +94,22 @@ class ViewOverrideFactory<ElemT, ViewT>
    * to {@link ViewT}.
    */
   static final class WriteView<ElemT, ViewT>
-      extends RawPTransform<PCollection<Iterable<ElemT>>, PCollection<Iterable<ElemT>>> {
-    private final PCollectionView<ViewT> view;
+      extends RawPTransform<PCollection<Iterable<ElemT>>, PCollectionView<ViewT>> {
+    private final CreatePCollectionView<ElemT, ViewT> og;
 
-    WriteView(PCollectionView<ViewT> view) {
-      this.view = view;
+    WriteView(CreatePCollectionView<ElemT, ViewT> og) {
+      this.og = og;
     }
 
     @Override
     @SuppressWarnings("deprecation")
-    public PCollection<Iterable<ElemT>> expand(PCollection<Iterable<ElemT>> input) {
-      return PCollection.<Iterable<ElemT>>createPrimitiveOutputInternal(
-              input.getPipeline(), input.getWindowingStrategy(), input.isBounded())
-          .setCoder(input.getCoder());
+    public PCollectionView<ViewT> expand(PCollection<Iterable<ElemT>> input) {
+      return og.getView();
     }
 
     @SuppressWarnings("deprecation")
     public PCollectionView<ViewT> getView() {
-      return view;
+      return og.getView();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
index 599b74f..40ce163 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
@@ -54,7 +54,6 @@ import javax.annotation.concurrent.GuardedBy;
 import org.apache.beam.runners.core.StateNamespace;
 import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.core.TimerInternals.TimerData;
-import org.apache.beam.runners.core.construction.TransformInputs;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.state.TimeDomain;
@@ -63,6 +62,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
 import org.joda.time.Instant;
 
 /**
@@ -831,11 +831,11 @@ class WatermarkManager {
 
   private Collection<Watermark> getInputProcessingWatermarks(AppliedPTransform<?, ?, ?> transform) {
     ImmutableList.Builder<Watermark> inputWmsBuilder = ImmutableList.builder();
-    Collection<PValue> inputs = TransformInputs.nonAdditionalInputs(transform);
+    Map<TupleTag<?>, PValue> inputs = transform.getInputs();
     if (inputs.isEmpty()) {
       inputWmsBuilder.add(THE_END_OF_TIME);
     }
-    for (PValue pvalue : inputs) {
+    for (PValue pvalue : inputs.values()) {
       Watermark producerOutputWatermark =
           getValueWatermark(pvalue).synchronizedProcessingOutputWatermark;
       inputWmsBuilder.add(producerOutputWatermark);
@@ -845,11 +845,11 @@ class WatermarkManager {
 
   private List<Watermark> getInputWatermarks(AppliedPTransform<?, ?, ?> transform) {
     ImmutableList.Builder<Watermark> inputWatermarksBuilder = ImmutableList.builder();
-    Collection< PValue> inputs = TransformInputs.nonAdditionalInputs(transform);
+    Map<TupleTag<?>, PValue> inputs = transform.getInputs();
     if (inputs.isEmpty()) {
       inputWatermarksBuilder.add(THE_END_OF_TIME);
     }
-    for (PValue pvalue : inputs) {
+    for (PValue pvalue : inputs.values()) {
       Watermark producerOutputWatermark = getValueWatermark(pvalue).outputWatermark;
       inputWatermarksBuilder.add(producerOutputWatermark);
     }
@@ -987,16 +987,16 @@ class WatermarkManager {
     // refresh.
     for (CommittedBundle<?> bundle : result.getOutputs()) {
       for (AppliedPTransform<?, ?, ?> consumer :
-          graph.getPerElementConsumers(bundle.getPCollection())) {
+          graph.getPrimitiveConsumers(bundle.getPCollection())) {
         TransformWatermarks watermarks = transformToWatermarks.get(consumer);
         watermarks.addPending(bundle);
       }
     }
 
     TransformWatermarks completedTransform = transformToWatermarks.get(result.getTransform());
-    if (result.getUnprocessedInputs().isPresent()) {
+    if (input != null) {
       // Add the unprocessed inputs
-      completedTransform.addPending(result.getUnprocessedInputs().get());
+      completedTransform.addPending(result.getUnprocessedInputs());
     }
     completedTransform.updateTimers(timerUpdate);
     if (input != null) {
@@ -1035,7 +1035,7 @@ class WatermarkManager {
     if (updateResult.isAdvanced()) {
       Set<AppliedPTransform<?, ?, ?>> additionalRefreshes = new HashSet<>();
       for (PValue outputPValue : toRefresh.getOutputs().values()) {
-        additionalRefreshes.addAll(graph.getPerElementConsumers(outputPValue));
+        additionalRefreshes.addAll(graph.getPrimitiveConsumers(outputPValue));
       }
       return additionalRefreshes;
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
index ba796ae..65a5a19 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
@@ -21,13 +21,11 @@ package org.apache.beam.runners.direct;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Supplier;
 import com.google.common.base.Suppliers;
-import java.io.IOException;
 import java.io.Serializable;
 import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
 import org.apache.beam.runners.core.construction.PTransformReplacements;
-import org.apache.beam.runners.core.construction.WriteFilesTranslation;
 import org.apache.beam.sdk.io.WriteFiles;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory;
@@ -45,35 +43,23 @@ import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TupleTag;
 
 /**
- * A {@link PTransformOverrideFactory} that overrides {@link WriteFiles} {@link PTransform
- * PTransforms} with an unspecified number of shards with a write with a specified number of shards.
- * The number of shards is the log base 10 of the number of input records, with up to 2 additional
- * shards.
+ * A {@link PTransformOverrideFactory} that overrides {@link WriteFiles}
+ * {@link PTransform PTransforms} with an unspecified number of shards with a write with a
+ * specified number of shards. The number of shards is the log base 10 of the number of input
+ * records, with up to 2 additional shards.
  */
 class WriteWithShardingFactory<InputT>
-    implements PTransformOverrideFactory<
-        PCollection<InputT>, PDone, PTransform<PCollection<InputT>, PDone>> {
+    implements PTransformOverrideFactory<PCollection<InputT>, PDone, WriteFiles<InputT>> {
   static final int MAX_RANDOM_EXTRA_SHARDS = 3;
   @VisibleForTesting static final int MIN_SHARDS_FOR_LOG = 3;
 
   @Override
   public PTransformReplacement<PCollection<InputT>, PDone> getReplacementTransform(
-      AppliedPTransform<PCollection<InputT>, PDone, PTransform<PCollection<InputT>, PDone>>
-          transform) {
-    try {
-      WriteFiles<InputT, ?, ?> replacement =
-          WriteFiles.to(
-              WriteFilesTranslation.getSink(transform),
-              WriteFilesTranslation.getFormatFunction(transform));
-      if (WriteFilesTranslation.isWindowedWrites(transform)) {
-        replacement = replacement.withWindowedWrites();
-      }
-      return PTransformReplacement.of(
-          PTransformReplacements.getSingletonMainInput(transform),
-          replacement.withSharding(new LogElementShardsWithDrift<InputT>()));
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
+      AppliedPTransform<PCollection<InputT>, PDone, WriteFiles<InputT>> transform) {
+
+    return PTransformReplacement.of(
+        PTransformReplacements.getSingletonMainInput(transform),
+        transform.getTransform().withSharding(new LogElementShardsWithDrift<InputT>()));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
index 8b95b34..cf19dc2 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
@@ -18,9 +18,9 @@
 
 package org.apache.beam.runners.direct;
 
+import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assert.assertThat;
 
-import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
 import java.io.Serializable;
 import java.util.Collections;
@@ -72,7 +72,7 @@ public class CommittedResultTest implements Serializable {
     CommittedResult result =
         CommittedResult.create(
             StepTransformResult.withoutHold(transform).build(),
-            Optional.<CommittedBundle<?>>absent(),
+            bundleFactory.createBundle(created).commit(Instant.now()),
             Collections.<CommittedBundle<?>>emptyList(),
             EnumSet.noneOf(OutputType.class));
 
@@ -88,11 +88,11 @@ public class CommittedResultTest implements Serializable {
     CommittedResult result =
         CommittedResult.create(
             StepTransformResult.withoutHold(transform).build(),
-            Optional.of(bundle),
+            bundle,
             Collections.<CommittedBundle<?>>emptyList(),
             EnumSet.noneOf(OutputType.class));
 
-    assertThat(result.getUnprocessedInputs().get(),
+    assertThat(result.getUnprocessedInputs(),
         Matchers.<CommittedBundle<?>>equalTo(bundle));
   }
 
@@ -101,14 +101,11 @@ public class CommittedResultTest implements Serializable {
     CommittedResult result =
         CommittedResult.create(
             StepTransformResult.withoutHold(transform).build(),
-            Optional.<CommittedBundle<?>>absent(),
+            null,
             Collections.<CommittedBundle<?>>emptyList(),
             EnumSet.noneOf(OutputType.class));
 
-    assertThat(
-        result.getUnprocessedInputs(),
-        Matchers.<Optional<? extends CommittedBundle<?>>>equalTo(
-            Optional.<CommittedBundle<?>>absent()));
+    assertThat(result.getUnprocessedInputs(), nullValue());
   }
 
   @Test
@@ -123,7 +120,7 @@ public class CommittedResultTest implements Serializable {
     CommittedResult result =
         CommittedResult.create(
             StepTransformResult.withoutHold(transform).build(),
-            Optional.<CommittedBundle<?>>absent(),
+            bundleFactory.createBundle(created).commit(Instant.now()),
             outputs,
             EnumSet.of(OutputType.BUNDLE, OutputType.PCOLLECTION_VIEW));
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
index bf3e83e..576edf3 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
@@ -151,13 +151,13 @@ public class DirectGraphVisitorTest implements Serializable {
         graph.getProducer(flattened);
 
     assertThat(
-        graph.getPerElementConsumers(created),
+        graph.getPrimitiveConsumers(created),
         Matchers.<AppliedPTransform<?, ?, ?>>containsInAnyOrder(
             transformedProducer, flattenedProducer));
     assertThat(
-        graph.getPerElementConsumers(transformed),
+        graph.getPrimitiveConsumers(transformed),
         Matchers.<AppliedPTransform<?, ?, ?>>containsInAnyOrder(flattenedProducer));
-    assertThat(graph.getPerElementConsumers(flattened), emptyIterable());
+    assertThat(graph.getPrimitiveConsumers(flattened), emptyIterable());
   }
 
   @Test
@@ -173,10 +173,10 @@ public class DirectGraphVisitorTest implements Serializable {
     AppliedPTransform<?, ?, ?> flattenedProducer = graph.getProducer(flattened);
 
     assertThat(
-        graph.getPerElementConsumers(created),
+        graph.getPrimitiveConsumers(created),
         Matchers.<AppliedPTransform<?, ?, ?>>containsInAnyOrder(flattenedProducer,
             flattenedProducer));
-    assertThat(graph.getPerElementConsumers(flattened), emptyIterable());
+    assertThat(graph.getPrimitiveConsumers(flattened), emptyIterable());
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphs.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphs.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphs.java
index 7707f7f..43de091 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphs.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphs.java
@@ -18,7 +18,6 @@
 package org.apache.beam.runners.direct;
 
 import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
@@ -26,12 +25,6 @@ import org.apache.beam.sdk.values.PValue;
 
 /** Test utilities for the {@link DirectRunner}. */
 final class DirectGraphs {
-  public static void performDirectOverrides(Pipeline p) {
-    p.replaceAll(
-        DirectRunner.fromOptions(PipelineOptionsFactory.create().as(DirectOptions.class))
-            .defaultTransformOverrides());
-  }
-
   public static DirectGraph getGraph(Pipeline p) {
     DirectGraphVisitor visitor = new DirectGraphVisitor();
     p.traverseTopologically(visitor);

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRegistrarTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRegistrarTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRegistrarTest.java
index 4b909bc..603e43e 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRegistrarTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRegistrarTest.java
@@ -37,7 +37,7 @@ public class DirectRegistrarTest {
   @Test
   public void testCorrectOptionsAreReturned() {
     assertEquals(
-        ImmutableList.of(DirectOptions.class, DirectTestOptions.class),
+        ImmutableList.of(DirectOptions.class),
         new Options().getPipelineOptions());
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
index 699a318..c0e43d6 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
@@ -101,13 +101,14 @@ public class EvaluationContextTest {
     view = created.apply(View.<Integer>asIterable());
     unbounded = p.apply(GenerateSequence.from(0));
 
-    p.replaceAll(runner.defaultTransformOverrides());
+    p.replaceAll(
+        DirectRunner.fromOptions(TestPipeline.testingPipelineOptions())
+            .defaultTransformOverrides());
 
     KeyedPValueTrackingVisitor keyedPValueTrackingVisitor = KeyedPValueTrackingVisitor.create();
     p.traverseTopologically(keyedPValueTrackingVisitor);
 
     BundleFactory bundleFactory = ImmutableListBundleFactory.create();
-    DirectGraphs.performDirectOverrides(p);
     graph = DirectGraphs.getGraph(p);
     context =
         EvaluationContext.create(
@@ -414,7 +415,7 @@ public class EvaluationContextTest {
         StepTransformResult.withoutHold(unboundedProducer).build());
     assertThat(context.isDone(), is(false));
 
-    for (AppliedPTransform<?, ?, ?> consumers : graph.getPerElementConsumers(created)) {
+    for (AppliedPTransform<?, ?, ?> consumers : graph.getPrimitiveConsumers(created)) {
       context.handleResult(
           committedBundle,
           ImmutableList.<TimerData>of(),