You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/01/24 00:08:42 UTC

[3/3] beam git commit: Always expand in AppliedPTransform

Always expand in AppliedPTransform

Never provide unexpanded input or output types from AppliedPTransform,
as they are not part of the model.

Update all runners to obtain appropriate PValues from the expansions.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7b062d71
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7b062d71
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7b062d71

Branch: refs/heads/master
Commit: 7b062d718f27ec6146958bfb06debefa6227bdb8
Parents: 26a2c47
Author: Thomas Groh <tg...@google.com>
Authored: Tue Dec 20 13:40:48 2016 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Mon Jan 23 16:08:21 2017 -0800

----------------------------------------------------------------------
 .../translation/CreateValuesTranslator.java     | 18 +++----
 .../FlattenPCollectionTranslator.java           | 28 ++++++++---
 .../apex/translation/GroupByKeyTranslator.java  |  2 +-
 .../translation/ParDoBoundMultiTranslator.java  | 27 ++++++----
 .../apex/translation/ParDoBoundTranslator.java  |  4 +-
 .../apex/translation/TranslationContext.java    | 27 +++++++---
 .../apex/translation/WindowBoundTranslator.java |  4 +-
 .../direct/BoundedReadEvaluatorFactory.java     | 10 ++--
 .../beam/runners/direct/EvaluationContext.java  |  2 +-
 .../direct/ExecutorServiceParallelExecutor.java |  4 +-
 .../runners/direct/FlattenEvaluatorFactory.java |  4 +-
 .../GroupAlsoByWindowEvaluatorFactory.java      |  5 +-
 .../direct/GroupByKeyOnlyEvaluatorFactory.java  | 10 +++-
 .../runners/direct/ParDoEvaluatorFactory.java   | 17 ++++++-
 .../direct/StatefulParDoEvaluatorFactory.java   | 12 +++--
 .../direct/TestStreamEvaluatorFactory.java      |  5 +-
 .../direct/UnboundedReadEvaluatorFactory.java   | 13 +++--
 .../runners/direct/ViewEvaluatorFactory.java    |  8 +--
 .../beam/runners/direct/WatermarkManager.java   |  6 +--
 .../runners/direct/WindowEvaluatorFactory.java  |  3 +-
 .../runners/direct/DirectGraphVisitorTest.java  | 16 +++---
 .../beam/runners/direct/ParDoEvaluatorTest.java |  3 +-
 .../StatefulParDoEvaluatorFactoryTest.java      |  4 +-
 .../FlinkBatchTransformTranslators.java         | 40 +++++++++++----
 .../FlinkBatchTranslationContext.java           | 21 +++++---
 .../FlinkStreamingTransformTranslators.java     | 46 ++++++++++-------
 .../FlinkStreamingTranslationContext.java       | 20 ++++++--
 .../dataflow/DataflowPipelineTranslator.java    | 53 +++++++++++++-------
 .../runners/dataflow/TransformTranslator.java   |  9 +++-
 .../dataflow/DataflowPipelineJobTest.java       | 29 +++++++----
 .../spark/translation/EvaluationContext.java    | 42 +++++++++-------
 .../spark/translation/TransformTranslator.java  | 34 +++++++++----
 .../streaming/StreamingTransformTranslator.java | 45 ++++++++++-------
 .../beam/sdk/transforms/AppliedPTransform.java  | 11 ++--
 34 files changed, 385 insertions(+), 197 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/7b062d71/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/CreateValuesTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/CreateValuesTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/CreateValuesTranslator.java
index ceae2b5..66fd27e 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/CreateValuesTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/CreateValuesTranslator.java
@@ -20,11 +20,9 @@ package org.apache.beam.runners.apex.translation;
 
 import org.apache.beam.runners.apex.translation.operators.ApexReadUnboundedInputOperator;
 import org.apache.beam.runners.apex.translation.utils.ValuesSource;
-import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.values.PBegin;
-
+import org.apache.beam.sdk.values.PCollection;
 
 /**
  * Wraps elements from Create.Values into an {@link UnboundedSource}.
@@ -35,14 +33,10 @@ class CreateValuesTranslator<T> implements TransformTranslator<Create.Values<T>>
 
   @Override
   public void translate(Create.Values<T> transform, TranslationContext context) {
-    try {
-      UnboundedSource<T, ?> unboundedSource = new ValuesSource<>(transform.getElements(),
-          transform.getDefaultOutputCoder((PBegin) context.getInput()));
-      ApexReadUnboundedInputOperator<T, ?> operator = new ApexReadUnboundedInputOperator<>(
-          unboundedSource, context.getPipelineOptions());
-      context.addOperator(operator, operator.output);
-    } catch (CannotProvideCoderException e) {
-      throw new RuntimeException(e);
-    }
+    UnboundedSource<T, ?> unboundedSource = new ValuesSource<>(transform.getElements(),
+        ((PCollection<T>) context.getOutput()).getCoder());
+    ApexReadUnboundedInputOperator<T, ?> operator =
+        new ApexReadUnboundedInputOperator<>(unboundedSource, context.getPipelineOptions());
+    context.addOperator(operator, operator.output);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7b062d71/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java
index eb24af9..928f135 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java
@@ -18,12 +18,12 @@
 
 package org.apache.beam.runners.apex.translation;
 
-import com.google.common.collect.Lists;
+import static com.google.common.base.Preconditions.checkArgument;
 
+import com.google.common.collect.Lists;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-
 import org.apache.beam.runners.apex.translation.operators.ApexFlattenOperator;
 import org.apache.beam.runners.apex.translation.operators.ApexReadUnboundedInputOperator;
 import org.apache.beam.runners.apex.translation.utils.ValuesSource;
@@ -32,7 +32,7 @@ import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.TaggedPValue;
 
 /**
  * {@link Flatten.FlattenPCollectionList} translation to Apex operator.
@@ -43,10 +43,9 @@ class FlattenPCollectionTranslator<T> implements
 
   @Override
   public void translate(Flatten.FlattenPCollectionList<T> transform, TranslationContext context) {
-    PCollectionList<T> input = context.getInput();
-    List<PCollection<T>> collections = input.getAll();
+    List<TaggedPValue> inputs = context.getInputs();
 
-    if (collections.isEmpty()) {
+    if (inputs.isEmpty()) {
       // create a dummy source that never emits anything
       @SuppressWarnings("unchecked")
       UnboundedSource<T, ?> unboundedSource = new ValuesSource<>(Collections.EMPTY_LIST,
@@ -55,10 +54,23 @@ class FlattenPCollectionTranslator<T> implements
           unboundedSource, context.getPipelineOptions());
       context.addOperator(operator, operator.output);
     } else {
-      PCollection<T> output = context.getOutput();
+      PCollection<T> output = (PCollection<T>) context.getOutput();
       Map<PCollection<?>, Integer> unionTags = Collections.emptyMap();
-      flattenCollections(collections, unionTags, output, context);
+      flattenCollections(extractPCollections(inputs), unionTags, output, context);
+    }
+  }
+
+  private List<PCollection<T>> extractPCollections(List<TaggedPValue> inputs) {
+    List<PCollection<T>> collections = Lists.newArrayList();
+    for (TaggedPValue pv : inputs) {
+      checkArgument(
+          pv.getValue() instanceof PCollection,
+          "Non-PCollection provided as input to flatten: %s of type %s",
+          pv.getValue(),
+          pv.getClass().getSimpleName());
+      collections.add((PCollection<T>) pv.getValue());
     }
+    return collections;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/7b062d71/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslator.java
index 47d447a..b46e3eb 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslator.java
@@ -31,7 +31,7 @@ class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKey<K, V>
 
   @Override
   public void translate(GroupByKey<K, V> transform, TranslationContext context) {
-    PCollection<KV<K, V>> input = context.getInput();
+    PCollection<KV<K, V>> input = (PCollection<KV<K, V>>) context.getInput();
     ApexGroupByKeyOperator<K, V> group = new ApexGroupByKeyOperator<>(context.getPipelineOptions(),
         input, context.<K>stateInternalsFactory()
         );

http://git-wip-us.apache.org/repos/asf/beam/blob/7b062d71/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java
index bff7652..2439020 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java
@@ -37,8 +37,8 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
 import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TaggedPValue;
 import org.apache.beam.sdk.values.TupleTag;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -76,8 +76,8 @@ class ParDoBoundMultiTranslator<InputT, OutputT>
               ApexRunner.class.getSimpleName()));
     }
 
-    PCollectionTuple output = context.getOutput();
-    PCollection<InputT> input = context.getInput();
+    List<TaggedPValue> outputs = context.getOutputs();
+    PCollection<InputT> input = (PCollection<InputT>) context.getInput();
     List<PCollectionView<?>> sideInputs = transform.getSideInputs();
     Coder<InputT> inputCoder = input.getCoder();
     WindowedValueCoder<InputT> wvInputCoder =
@@ -90,21 +90,28 @@ class ParDoBoundMultiTranslator<InputT, OutputT>
             doFn,
             transform.getMainOutputTag(),
             transform.getSideOutputTags().getAll(),
-            context.<PCollection<?>>getInput().getWindowingStrategy(),
+            ((PCollection<InputT>) context.getInput()).getWindowingStrategy(),
             sideInputs,
             wvInputCoder,
             context.<Void>stateInternalsFactory());
 
-    Map<TupleTag<?>, PCollection<?>> outputs = output.getAll();
     Map<PCollection<?>, OutputPort<?>> ports = Maps.newHashMapWithExpectedSize(outputs.size());
-    for (Map.Entry<TupleTag<?>, PCollection<?>> outputEntry : outputs.entrySet()) {
-      if (outputEntry.getKey() == transform.getMainOutputTag()) {
-        ports.put(outputEntry.getValue(), operator.output);
+    for (TaggedPValue output : outputs) {
+      checkArgument(
+          output.getValue() instanceof PCollection,
+          "%s %s outputs non-PCollection %s of type %s",
+          ParDo.BoundMulti.class.getSimpleName(),
+          context.getFullName(),
+          output.getValue(),
+          output.getValue().getClass().getSimpleName());
+      PCollection<?> pc = (PCollection<?>) output.getValue();
+      if (output.getTag().equals(transform.getMainOutputTag())) {
+        ports.put(pc, operator.output);
       } else {
         int portIndex = 0;
         for (TupleTag<?> tag : transform.getSideOutputTags().getAll()) {
-          if (tag == outputEntry.getKey()) {
-            ports.put(outputEntry.getValue(), operator.sideOutputPorts[portIndex]);
+          if (tag.equals(output.getTag())) {
+            ports.put(pc, operator.sideOutputPorts[portIndex]);
             break;
           }
           portIndex++;

http://git-wip-us.apache.org/repos/asf/beam/blob/7b062d71/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java
index 3b6eb6e..c24250f 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java
@@ -63,8 +63,8 @@ class ParDoBoundTranslator<InputT, OutputT>
               ApexRunner.class.getSimpleName()));
     }
 
-    PCollection<OutputT> output = context.getOutput();
-    PCollection<InputT> input = context.getInput();
+    PCollection<OutputT> output = (PCollection<OutputT>) context.getOutput();
+    PCollection<InputT> input = (PCollection<InputT>) context.getInput();
     List<PCollectionView<?>> sideInputs = transform.getSideInputs();
     Coder<InputT> inputCoder = input.getCoder();
     WindowedValueCoder<InputT> wvInputCoder =

http://git-wip-us.apache.org/repos/asf/beam/blob/7b062d71/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java
index 3bf01a8..8c30bc6 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java
@@ -24,6 +24,7 @@ import com.datatorrent.api.DAG;
 import com.datatorrent.api.Operator;
 import com.datatorrent.api.Operator.InputPort;
 import com.datatorrent.api.Operator.OutputPort;
+import com.google.common.collect.Iterables;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -40,7 +41,8 @@ import org.apache.beam.sdk.util.state.StateInternalsFactory;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TaggedPValue;
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 
@@ -78,12 +80,24 @@ class TranslationContext {
     return pipelineOptions;
   }
 
-  public <InputT extends PInput> InputT getInput() {
-    return (InputT) getCurrentTransform().getInput();
+  public String getFullName() {
+    return getCurrentTransform().getFullName();
   }
 
-  public <OutputT extends POutput> OutputT getOutput() {
-    return (OutputT) getCurrentTransform().getOutput();
+  public List<TaggedPValue> getInputs() {
+    return getCurrentTransform().getInputs();
+  }
+
+  public PValue getInput() {
+    return Iterables.getOnlyElement(getCurrentTransform().getInputs()).getValue();
+  }
+
+  public List<TaggedPValue> getOutputs() {
+    return getCurrentTransform().getOutputs();
+  }
+
+  public PValue getOutput() {
+    return Iterables.getOnlyElement(getCurrentTransform().getOutputs()).getValue();
   }
 
   private AppliedPTransform<?, ?, ?> getCurrentTransform() {
@@ -92,7 +106,7 @@ class TranslationContext {
   }
 
   public void addOperator(Operator operator, OutputPort port) {
-    addOperator(operator, port, this.<PCollection<?>>getOutput());
+    addOperator(operator, port, (PCollection<?>) getOutput());
   }
 
   /**
@@ -170,5 +184,4 @@ class TranslationContext {
   public <K> StateInternalsFactory<K> stateInternalsFactory() {
     return new ApexStateInternals.ApexStateInternalsFactory();
   }
-
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7b062d71/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java
index 50af81d..a241cad 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java
@@ -43,8 +43,8 @@ class WindowBoundTranslator<T> implements TransformTranslator<Window.Bound<T>> {
 
   @Override
   public void translate(Window.Bound<T> transform, TranslationContext context) {
-    PCollection<T> output = context.getOutput();
-    PCollection<T> input = context.getInput();
+    PCollection<T> output = (PCollection<T>) context.getOutput();
+    PCollection<T> input = (PCollection<T>) context.getInput();
     @SuppressWarnings("unchecked")
     WindowingStrategy<T, BoundedWindow> windowingStrategy =
         (WindowingStrategy<T, BoundedWindow>) output.getWindowingStrategy();

http://git-wip-us.apache.org/repos/asf/beam/blob/7b062d71/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
index 8874a04..57735e7 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.direct;
 import com.google.auto.value.AutoValue;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
 import com.google.common.util.concurrent.SettableFuture;
 import java.io.IOException;
 import java.util.Collection;
@@ -102,7 +103,7 @@ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory {
    */
   private static class BoundedReadEvaluator<OutputT>
       implements TransformEvaluator<BoundedSourceShard<OutputT>> {
-    private final AppliedPTransform<?, PCollection<OutputT>, ?> transform;
+    private final PCollection<OutputT> outputPCollection;
     private final EvaluationContext evaluationContext;
     private Builder resultBuilder;
 
@@ -114,9 +115,10 @@ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory {
         EvaluationContext evaluationContext,
         long minimumDynamicSplitSize,
         ExecutorService executor) {
-      this.transform = transform;
       this.evaluationContext = evaluationContext;
-      resultBuilder = StepTransformResult.withoutHold(transform);
+      this.outputPCollection =
+          (PCollection<OutputT>) Iterables.getOnlyElement(transform.getOutputs()).getValue();
+      this.resultBuilder = StepTransformResult.withoutHold(transform);
       this.minimumDynamicSplitSize = minimumDynamicSplitSize;
       this.produceSplitExecutor = executor;
     }
@@ -129,7 +131,7 @@ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory {
           source.createReader(evaluationContext.getPipelineOptions())) {
         boolean contentsRemaining = reader.start();
         Future<BoundedSource<OutputT>> residualFuture = startDynamicSplitThread(source, reader);
-        UncommittedBundle<OutputT> output = evaluationContext.createBundle(transform.getOutput());
+        UncommittedBundle<OutputT> output = evaluationContext.createBundle(outputPCollection);
         while (contentsRemaining) {
           output.add(
               WindowedValue.timestampedValueInGlobalWindow(

http://git-wip-us.apache.org/repos/asf/beam/blob/7b062d71/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
index 3b9367a..0e89a67 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
@@ -420,7 +420,7 @@ class EvaluationContext {
     }
     // If the PTransform has any unbounded outputs, and unbounded producers should not be shut down,
     // the PTransform may produce additional output. It is not done.
-    for (TaggedPValue output : transform.getOutput().expand()) {
+    for (TaggedPValue output : transform.getOutputs()) {
       if (output.getValue() instanceof PCollection) {
         IsBounded bounded = ((PCollection<?>) output.getValue()).isBounded();
         if (bounded.equals(IsBounded.UNBOUNDED)

http://git-wip-us.apache.org/repos/asf/beam/blob/7b062d71/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index 5a653b7..20d562f 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -457,7 +457,9 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
               evaluationContext
                   .createKeyedBundle(
                       transformTimers.getKey(),
-                      (PCollection) transformTimers.getTransform().getInput())
+                      (PCollection)
+                          Iterables.getOnlyElement(transformTimers.getTransform().getInputs())
+                              .getValue())
                   .add(WindowedValue.valueInGlobalWindow(work))
                   .commit(evaluationContext.now());
           scheduleConsumption(

http://git-wip-us.apache.org/repos/asf/beam/blob/7b062d71/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
index 817e736..66862ea 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.direct;
 
+import com.google.common.collect.Iterables;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
@@ -55,7 +56,8 @@ class FlattenEvaluatorFactory implements TransformEvaluatorFactory {
               PCollectionList<InputT>, PCollection<InputT>, FlattenPCollectionList<InputT>>
           application) {
     final UncommittedBundle<InputT> outputBundle =
-        evaluationContext.createBundle(application.getOutput());
+        evaluationContext.createBundle(
+            (PCollection<InputT>) Iterables.getOnlyElement(application.getOutputs()).getValue());
     final TransformResult<InputT> result =
         StepTransformResult.<InputT>withoutHold(application).addOutput(outputBundle).build();
     return new FlattenEvaluator<>(outputBundle, result);

http://git-wip-us.apache.org/repos/asf/beam/blob/7b062d71/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
index 2bc0d8d..b97729a 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
@@ -159,7 +159,10 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
       K key = workItem.key();
 
       UncommittedBundle<KV<K, Iterable<V>>> bundle =
-          evaluationContext.createKeyedBundle(structuralKey, application.getOutput());
+          evaluationContext.createKeyedBundle(
+              structuralKey,
+              (PCollection<KV<K, Iterable<V>>>)
+                  Iterables.getOnlyElement(application.getOutputs()).getValue());
       outputBundles.add(bundle);
       CopyOnAccessInMemoryStateInternals<K> stateInternals =
           (CopyOnAccessInMemoryStateInternals<K>) stepContext.stateInternals();

http://git-wip-us.apache.org/repos/asf/beam/blob/7b062d71/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java
index 20d619f..ef96a92 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.direct;
 import static com.google.common.base.Preconditions.checkState;
 import static org.apache.beam.sdk.util.CoderUtils.encodeToByteArray;
 
+import com.google.common.collect.Iterables;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -102,7 +103,10 @@ class GroupByKeyOnlyEvaluatorFactory implements TransformEvaluatorFactory {
             DirectGroupByKeyOnly<K, V>> application) {
       this.evaluationContext = evaluationContext;
       this.application = application;
-      this.keyCoder = getKeyCoder(application.getInput().getCoder());
+      this.keyCoder =
+          getKeyCoder(
+              ((PCollection<KV<K, V>>) Iterables.getOnlyElement(application.getInputs()).getValue())
+                  .getCoder());
       this.groupingMap = new HashMap<>();
     }
 
@@ -152,7 +156,9 @@ class GroupByKeyOnlyEvaluatorFactory implements TransformEvaluatorFactory {
             KeyedWorkItems.elementsWorkItem(key, groupedEntry.getValue());
         UncommittedBundle<KeyedWorkItem<K, V>> bundle =
             evaluationContext.createKeyedBundle(
-                StructuralKey.of(key, keyCoder), application.getOutput());
+                StructuralKey.of(key, keyCoder),
+                (PCollection<KeyedWorkItem<K, V>>)
+                    Iterables.getOnlyElement(application.getOutputs()).getValue());
         bundle.add(WindowedValue.valueInGlobalWindow(groupedKv));
         resultBuilder.addOutput(bundle);
       }

http://git-wip-us.apache.org/repos/asf/beam/blob/7b062d71/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
index 2fc19b7..b028766 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
@@ -20,7 +20,10 @@ package org.apache.beam.runners.direct;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
+import com.google.common.collect.Iterables;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
@@ -29,6 +32,7 @@ import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TaggedPValue;
 import org.apache.beam.sdk.values.TupleTag;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -137,13 +141,14 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator
           evaluationContext,
           stepContext,
           application,
-          application.getInput().getWindowingStrategy(),
+          ((PCollection<InputT>) Iterables.getOnlyElement(application.getInputs()).getValue())
+              .getWindowingStrategy(),
           fn,
           key,
           sideInputs,
           mainOutputTag,
           sideOutputTags,
-          application.getOutput().getAll());
+          pcollections(application.getOutputs()));
     } catch (Exception e) {
       try {
         fnManager.remove();
@@ -157,6 +162,14 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator
     }
   }
 
+  private Map<TupleTag<?>, PCollection<?>> pcollections(List<TaggedPValue> outputs) {
+    Map<TupleTag<?>, PCollection<?>> pcs = new HashMap<>();
+    for (TaggedPValue output : outputs) {
+      pcs.put(output.getTag(), (PCollection<?>) output.getValue());
+    }
+    return pcs;
+  }
+
   public DoFnLifecycleManager getManagerForCloneOf(DoFn<?, ?> fn) {
     return fnClones.getUnchecked(fn);
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/7b062d71/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
index 9582d5c..cce97a6 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
@@ -24,6 +24,8 @@ import com.google.common.cache.LoadingCache;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.KeyedWorkItems;
 import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
@@ -48,6 +50,8 @@ import org.apache.beam.sdk.util.state.StateTags;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.beam.sdk.values.TupleTag;
 
 /** A {@link TransformEvaluatorFactory} for stateful {@link ParDo}. */
 final class StatefulParDoEvaluatorFactory<K, InputT, OutputT> implements TransformEvaluatorFactory {
@@ -132,10 +136,12 @@ final class StatefulParDoEvaluatorFactory<K, InputT, OutputT> implements Transfo
         final AppliedPTransformOutputKeyAndWindow<K, InputT, OutputT> transformOutputWindow) {
       String stepName = evaluationContext.getStepName(transformOutputWindow.getTransform());
 
+      Map<TupleTag<?>, PCollection<?>> taggedValues = new HashMap<>();
+      for (TaggedPValue pv : transformOutputWindow.getTransform().getOutputs()) {
+        taggedValues.put(pv.getTag(), (PCollection<?>) pv.getValue());
+      }
       PCollection<?> pc =
-          transformOutputWindow
-              .getTransform()
-              .getOutput()
+          taggedValues
               .get(
                   transformOutputWindow
                       .getTransform()

http://git-wip-us.apache.org/repos/asf/beam/blob/7b062d71/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
index 6ba65bf..bdf293f 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
@@ -23,6 +23,7 @@ import static com.google.common.base.Preconditions.checkState;
 import com.google.auto.value.AutoValue;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Supplier;
+import com.google.common.collect.Iterables;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
@@ -103,7 +104,9 @@ class TestStreamEvaluatorFactory implements TransformEvaluatorFactory {
       Event<T> event = events.get(index);
 
       if (event.getType().equals(EventType.ELEMENT)) {
-        UncommittedBundle<T> bundle = context.createBundle(application.getOutput());
+        UncommittedBundle<T> bundle =
+            context.createBundle(
+                (PCollection<T>) Iterables.getOnlyElement(application.getOutputs()).getValue());
         for (TimestampedValue<T> elem : ((ElementEvent<T>) event).getElements()) {
           bundle.add(
               WindowedValue.timestampedValueInGlobalWindow(elem.getValue(), elem.getTimestamp()));

http://git-wip-us.apache.org/repos/asf/beam/blob/7b062d71/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
index 013e929..ff59390 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
@@ -17,9 +17,12 @@
  */
 package org.apache.beam.runners.direct;
 
+import static com.google.common.collect.Iterables.getOnlyElement;
+
 import com.google.auto.value.AutoValue;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
@@ -115,7 +118,9 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
     @Override
     public void processElement(
         WindowedValue<UnboundedSourceShard<OutputT, CheckpointMarkT>> element) throws IOException {
-      UncommittedBundle<OutputT> output = evaluationContext.createBundle(transform.getOutput());
+      UncommittedBundle<OutputT> output =
+          evaluationContext.createBundle(
+              (PCollection<OutputT>) getOnlyElement(transform.getOutputs()).getValue());
       UnboundedSourceShard<OutputT, CheckpointMarkT> shard = element.getValue();
       UnboundedReader<OutputT> reader = null;
       try {
@@ -203,10 +208,12 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
       // If the watermark is the max value, this source may not be invoked again. Finalize after
       // committing the output.
       if (!reader.getWatermark().isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
+        PCollection<OutputT> outputPc =
+            (PCollection<OutputT>) Iterables.getOnlyElement(transform.getOutputs()).getValue();
         evaluationContext.scheduleAfterOutputWouldBeProduced(
-            transform.getOutput(),
+            outputPc,
             GlobalWindow.INSTANCE,
-            transform.getOutput().getWindowingStrategy(),
+            outputPc.getWindowingStrategy(),
             new Runnable() {
               @Override
               public void run() {

http://git-wip-us.apache.org/repos/asf/beam/blob/7b062d71/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
index 96a18d7..fcd8423 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.direct;
 
+import com.google.common.collect.Iterables;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.beam.runners.direct.CommittedResult.OutputType;
@@ -68,9 +69,10 @@ class ViewEvaluatorFactory implements TransformEvaluatorFactory {
   private <InT, OuT> TransformEvaluator<Iterable<InT>> createEvaluator(
       final AppliedPTransform<PCollection<Iterable<InT>>, PCollectionView<OuT>, WriteView<InT, OuT>>
           application) {
-    PCollection<Iterable<InT>> input = application.getInput();
-    final PCollectionViewWriter<InT, OuT> writer =
-        context.createPCollectionViewWriter(input, application.getOutput());
+    PCollection<Iterable<InT>> input =
+        (PCollection<Iterable<InT>>) Iterables.getOnlyElement(application.getInputs()).getValue();
+    final PCollectionViewWriter<InT, OuT> writer = context.createPCollectionViewWriter(input,
+        (PCollectionView<OuT>) Iterables.getOnlyElement(application.getOutputs()).getValue());
     return new TransformEvaluator<Iterable<InT>>() {
       private final List<WindowedValue<InT>> elements = new ArrayList<>();
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7b062d71/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
index 248fafd..ae15285 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
@@ -817,7 +817,7 @@ public class WatermarkManager {
 
   private Collection<Watermark> getInputProcessingWatermarks(AppliedPTransform<?, ?, ?> transform) {
     ImmutableList.Builder<Watermark> inputWmsBuilder = ImmutableList.builder();
-    List<TaggedPValue> inputs = transform.getInput().expand();
+    List<TaggedPValue> inputs = transform.getInputs();
     if (inputs.isEmpty()) {
       inputWmsBuilder.add(THE_END_OF_TIME);
     }
@@ -832,7 +832,7 @@ public class WatermarkManager {
 
   private List<Watermark> getInputWatermarks(AppliedPTransform<?, ?, ?> transform) {
     ImmutableList.Builder<Watermark> inputWatermarksBuilder = ImmutableList.builder();
-    List<TaggedPValue> inputs = transform.getInput().expand();
+    List<TaggedPValue> inputs = transform.getInputs();
     if (inputs.isEmpty()) {
       inputWatermarksBuilder.add(THE_END_OF_TIME);
     }
@@ -1022,7 +1022,7 @@ public class WatermarkManager {
     WatermarkUpdate updateResult = myWatermarks.refresh();
     if (updateResult.isAdvanced()) {
       Set<AppliedPTransform<?, ?, ?>> additionalRefreshes = new HashSet<>();
-      for (TaggedPValue outputPValue : toRefresh.getOutput().expand()) {
+      for (TaggedPValue outputPValue : toRefresh.getOutputs()) {
         additionalRefreshes.addAll(graph.getPrimitiveConsumers(outputPValue.getValue()));
       }
       return additionalRefreshes;

http://git-wip-us.apache.org/repos/asf/beam/blob/7b062d71/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
index 991addf..4ca556b 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
@@ -57,7 +57,8 @@ class WindowEvaluatorFactory implements TransformEvaluatorFactory {
       AppliedPTransform<PCollection<InputT>, PCollection<InputT>, Window.Bound<InputT>> transform) {
     WindowFn<? super InputT, ?> fn = transform.getTransform().getWindowFn();
     UncommittedBundle<InputT> outputBundle =
-        evaluationContext.createBundle(transform.getOutput());
+        evaluationContext.createBundle(
+            (PCollection<InputT>) Iterables.getOnlyElement(transform.getOutputs()).getValue());
     if (fn == null) {
       return PassthroughTransformEvaluator.create(transform, outputBundle);
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/7b062d71/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
index 01d11a3..df49796 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
@@ -21,7 +21,6 @@ import static org.hamcrest.Matchers.emptyIterable;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasSize;
 import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
 
 import com.google.common.collect.Iterables;
 import java.io.Serializable;
@@ -40,13 +39,13 @@ import org.apache.beam.sdk.transforms.Flatten.FlattenPCollectionList;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PDone;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.TaggedPValue;
 import org.hamcrest.Matchers;
 import org.junit.Rule;
 import org.junit.Test;
@@ -88,7 +87,7 @@ public class DirectGraphVisitorTest implements Serializable {
   }
 
   @Test
-  public void getRootTransformsContainsPBegins() {
+  public void getRootTransformsContainsRootTransforms() {
     PCollection<String> created = p.apply(Create.of("foo", "bar"));
     PCollection<Long> counted = p.apply(Read.from(CountingSource.upTo(1234L)));
     PCollection<Long> unCounted = p.apply(CountingInput.unbounded());
@@ -101,8 +100,11 @@ public class DirectGraphVisitorTest implements Serializable {
         Matchers.<AppliedPTransform<?, ?, ?>>containsInAnyOrder(
             graph.getProducer(created), graph.getProducer(counted), graph.getProducer(unCounted)));
     for (AppliedPTransform<?, ?, ?> root : graph.getRootTransforms())  {
-      assertTrue(root.getInput() instanceof PBegin);
-      assertThat(root.getOutput(), Matchers.<POutput>isOneOf(created, counted, unCounted));
+      // Root transforms will have no inputs
+      assertThat(root.getInputs(), emptyIterable());
+      assertThat(
+          Iterables.getOnlyElement(root.getOutputs()).getValue(),
+          Matchers.<POutput>isOneOf(created, counted, unCounted));
     }
   }
 
@@ -119,8 +121,8 @@ public class DirectGraphVisitorTest implements Serializable {
         Matchers.<AppliedPTransform<?, ?, ?>>containsInAnyOrder(graph.getProducer(empty)));
     AppliedPTransform<?, ?, ?> onlyRoot = Iterables.getOnlyElement(graph.getRootTransforms());
     assertThat(onlyRoot.getTransform(), Matchers.<PTransform<?, ?>>equalTo(flatten));
-    assertThat(onlyRoot.getInput(), Matchers.<PInput>equalTo(emptyList));
-    assertThat(onlyRoot.getOutput(), Matchers.<POutput>equalTo(empty));
+    assertThat(onlyRoot.getInputs(), Matchers.<TaggedPValue>emptyIterable());
+    assertThat(onlyRoot.getOutputs(), equalTo(empty.expand()));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/beam/blob/7b062d71/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
index b3aceeb..c85b85e 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
@@ -162,7 +162,8 @@ public class ParDoEvaluatorTest {
         evaluationContext,
         stepContext,
         transform,
-        transform.getInput().getWindowingStrategy(),
+        ((PCollection<?>) Iterables.getOnlyElement(transform.getInputs()).getValue())
+            .getWindowingStrategy(),
         fn,
         null /* key */,
         ImmutableList.<PCollectionView<?>>of(singletonView),

http://git-wip-us.apache.org/repos/asf/beam/blob/7b062d71/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
index b88d5e0..94ffbd3 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
@@ -290,7 +290,9 @@ public class StatefulParDoEvaluatorFactoryTest implements Serializable {
 
     CommittedBundle<KeyedWorkItem<String, KV<String, Integer>>> inputBundle =
         BUNDLE_FACTORY
-            .createBundle(producingTransform.getInput())
+            .createBundle(
+                (PCollection<KeyedWorkItem<String, KV<String, Integer>>>)
+                    Iterables.getOnlyElement(producingTransform.getInputs()).getValue())
             .add(gbkOutputElement)
             .commit(Instant.now());
     TransformEvaluator<KeyedWorkItem<String, KV<String, Integer>>> evaluator =

http://git-wip-us.apache.org/repos/asf/beam/blob/7b062d71/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
index eb625b2..654b464 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
@@ -17,6 +17,9 @@
  */
 package org.apache.beam.runners.flink.translation;
 
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import java.util.ArrayList;
@@ -69,6 +72,7 @@ import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TaggedPValue;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.functions.FlatMapFunction;
@@ -190,7 +194,8 @@ class FlinkBatchTransformTranslators {
       Combine.KeyedCombineFn<K, InputT, List<InputT>, List<InputT>> combineFn =
           new Concatenate<InputT>().asKeyedFn();
 
-      KvCoder<K, InputT> inputCoder = (KvCoder<K, InputT>) context.getInput(transform).getCoder();
+      KvCoder<K, InputT> inputCoder =
+          (KvCoder<K, InputT>) context.getInput(transform).getCoder();
 
       Coder<List<InputT>> accumulatorCoder;
 
@@ -365,7 +370,8 @@ class FlinkBatchTransformTranslators {
       CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, OutputT> combineFn =
           (CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, OutputT>) transform.getFn();
 
-      KvCoder<K, InputT> inputCoder = (KvCoder<K, InputT>) context.getInput(transform).getCoder();
+      KvCoder<K, InputT> inputCoder =
+          (KvCoder<K, InputT>) context.getInput(transform).getCoder();
 
       Coder<AccumT> accumulatorCoder;
 
@@ -567,15 +573,15 @@ class FlinkBatchTransformTranslators {
       DataSet<WindowedValue<InputT>> inputDataSet =
           context.getInputDataSet(context.getInput(transform));
 
-      Map<TupleTag<?>, PCollection<?>> outputs = context.getOutput(transform).getAll();
+      List<TaggedPValue> outputs = context.getOutputs(transform);
 
       Map<TupleTag<?>, Integer> outputMap = Maps.newHashMap();
       // put the main output at index 0, FlinkMultiOutputDoFnFunction  expects this
       outputMap.put(transform.getMainOutputTag(), 0);
       int count = 1;
-      for (TupleTag<?> tag: outputs.keySet()) {
-        if (!outputMap.containsKey(tag)) {
-          outputMap.put(tag, count++);
+      for (TaggedPValue taggedValue : outputs) {
+        if (!outputMap.containsKey(taggedValue)) {
+          outputMap.put(taggedValue.getTag(), count++);
         }
       }
 
@@ -584,7 +590,13 @@ class FlinkBatchTransformTranslators {
 
       // collect all output Coders and create a UnionCoder for our tagged outputs
       List<Coder<?>> outputCoders = Lists.newArrayList();
-      for (PCollection<?> coll: outputs.values()) {
+      for (TaggedPValue taggedValue : outputs) {
+        checkState(
+            taggedValue.getValue() instanceof PCollection,
+            "Within ParDo, got a non-PCollection output %s of type %s",
+            taggedValue.getValue(),
+            taggedValue.getValue().getClass().getSimpleName());
+        PCollection<?> coll = (PCollection<?>) taggedValue.getValue();
         outputCoders.add(coll.getCoder());
         windowingStrategy = coll.getWindowingStrategy();
       }
@@ -628,11 +640,11 @@ class FlinkBatchTransformTranslators {
 
       transformSideInputs(sideInputs, taggedDataSet, context);
 
-      for (Map.Entry<TupleTag<?>, PCollection<?>> output: outputs.entrySet()) {
+      for (TaggedPValue output : outputs) {
         pruneOutput(
             taggedDataSet,
             context,
-            outputMap.get(output.getKey()),
+            outputMap.get(output.getTag()),
             (PCollection) output.getValue());
       }
     }
@@ -668,7 +680,7 @@ class FlinkBatchTransformTranslators {
         Flatten.FlattenPCollectionList<T> transform,
         FlinkBatchTranslationContext context) {
 
-      List<PCollection<T>> allInputs = context.getInput(transform).getAll();
+      List<TaggedPValue> allInputs = context.getInputs(transform);
       DataSet<WindowedValue<T>> result = null;
 
       if (allInputs.isEmpty()) {
@@ -689,7 +701,13 @@ class FlinkBatchTransformTranslators {
                     (Coder<T>) VoidCoder.of(),
                     GlobalWindow.Coder.INSTANCE)));
       } else {
-        for (PCollection<T> collection : allInputs) {
+        for (TaggedPValue taggedPc : allInputs) {
+          checkArgument(
+              taggedPc.getValue() instanceof PCollection,
+              "Got non-PCollection input to flatten: %s of type %s",
+              taggedPc.getValue(),
+              taggedPc.getValue().getClass().getSimpleName());
+          PCollection<T> collection = (PCollection<T>) taggedPc.getValue();
           DataSet<WindowedValue<T>> current = context.getInputDataSet(collection);
           if (result == null) {
             result = current;

http://git-wip-us.apache.org/repos/asf/beam/blob/7b062d71/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTranslationContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTranslationContext.java
index 611f5e6..1f91e5e 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTranslationContext.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTranslationContext.java
@@ -17,7 +17,9 @@
  */
 package org.apache.beam.runners.flink.translation;
 
+import com.google.common.collect.Iterables;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
 import org.apache.beam.sdk.coders.Coder;
@@ -28,9 +30,8 @@ import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TaggedPValue;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
@@ -133,13 +134,21 @@ public class FlinkBatchTranslationContext {
     return new CoderTypeInformation<>(windowedValueCoder);
   }
 
+  List<TaggedPValue> getInputs(PTransform<?, ?> transform) {
+    return currentTransform.getInputs();
+  }
+
   @SuppressWarnings("unchecked")
-  <T extends PInput> T getInput(PTransform<T, ?> transform) {
-    return (T) currentTransform.getInput();
+  <T extends PValue> T getInput(PTransform<T, ?> transform) {
+    return (T) Iterables.getOnlyElement(currentTransform.getInputs()).getValue();
+  }
+
+  List<TaggedPValue> getOutputs(PTransform<?, ?> transform) {
+    return currentTransform.getOutputs();
   }
 
   @SuppressWarnings("unchecked")
-  <T extends POutput> T getOutput(PTransform<?, T> transform) {
-    return (T) currentTransform.getOutput();
+  <T extends PValue> T getOutput(PTransform<?, T> transform) {
+    return (T) Iterables.getOnlyElement(currentTransform.getOutputs()).getValue();
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7b062d71/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
index ffa6d16..24ef987 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
@@ -18,6 +18,8 @@
 
 package org.apache.beam.runners.flink.translation;
 
+import static com.google.common.base.Preconditions.checkArgument;
+
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import java.nio.ByteBuffer;
@@ -27,7 +29,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import org.apache.beam.runners.core.SystemReduceFn;
 import org.apache.beam.runners.flink.FlinkRunner;
 import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows;
@@ -72,6 +73,7 @@ import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TaggedPValue;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.MapFunction;
@@ -497,10 +499,10 @@ public class FlinkStreamingTransformTranslators {
       WindowingStrategy<?, ?> windowingStrategy =
           context.getInput(transform).getWindowingStrategy();
 
-      Map<TupleTag<?>, PCollection<?>> outputs = context.getOutput(transform).getAll();
+      List<TaggedPValue> outputs = context.getOutputs(transform);
 
       Map<TupleTag<?>, Integer> tagsToLabels =
-          transformTupleTagsToLabels(transform.getMainOutputTag(), outputs.keySet());
+          transformTupleTagsToLabels(transform.getMainOutputTag(), outputs);
 
       List<PCollectionView<?>> sideInputs = transform.getSideInputs();
 
@@ -525,7 +527,7 @@ public class FlinkStreamingTransformTranslators {
                 Collections.<PCollectionView<?>>emptyList(), /* side inputs */
                 context.getPipelineOptions());
 
-        UnionCoder outputUnionCoder = createUnionCoder(outputs.values());
+        UnionCoder outputUnionCoder = createUnionCoder(outputs);
 
         CoderTypeInformation<RawUnionValue> outputUnionTypeInformation =
             new CoderTypeInformation<>(outputUnionCoder);
@@ -552,7 +554,7 @@ public class FlinkStreamingTransformTranslators {
                 sideInputs,
                 context.getPipelineOptions());
 
-        UnionCoder outputUnionCoder = createUnionCoder(outputs.values());
+        UnionCoder outputUnionCoder = createUnionCoder(outputs);
 
         CoderTypeInformation<RawUnionValue> outputUnionTypeInformation =
             new CoderTypeInformation<>(outputUnionCoder);
@@ -573,11 +575,10 @@ public class FlinkStreamingTransformTranslators {
                 }
               });
 
-      for (Map.Entry<TupleTag<?>, PCollection<?>> output : outputs.entrySet()) {
-        final int outputTag = tagsToLabels.get(output.getKey());
+      for (TaggedPValue output : outputs) {
+        final int outputTag = tagsToLabels.get(output.getTag());
 
-        TypeInformation outputTypeInfo =
-            context.getTypeInfo(output.getValue());
+        TypeInformation outputTypeInfo = context.getTypeInfo((PCollection<?>) output.getValue());
 
         @SuppressWarnings("unchecked")
         DataStream unwrapped = splitStream.select(String.valueOf(outputTag))
@@ -594,22 +595,28 @@ public class FlinkStreamingTransformTranslators {
 
     private Map<TupleTag<?>, Integer> transformTupleTagsToLabels(
         TupleTag<?> mainTag,
-        Set<TupleTag<?>> secondaryTags) {
+        List<TaggedPValue> allTaggedValues) {
 
       Map<TupleTag<?>, Integer> tagToLabelMap = Maps.newHashMap();
       int count = 0;
       tagToLabelMap.put(mainTag, count++);
-      for (TupleTag<?> tag : secondaryTags) {
-        if (!tagToLabelMap.containsKey(tag)) {
-          tagToLabelMap.put(tag, count++);
+      for (TaggedPValue taggedPValue : allTaggedValues) {
+        if (!tagToLabelMap.containsKey(taggedPValue.getTag())) {
+          tagToLabelMap.put(taggedPValue.getTag(), count++);
         }
       }
       return tagToLabelMap;
     }
 
-    private UnionCoder createUnionCoder(Collection<PCollection<?>> taggedCollections) {
+    private UnionCoder createUnionCoder(Collection<TaggedPValue> taggedCollections) {
       List<Coder<?>> outputCoders = Lists.newArrayList();
-      for (PCollection<?> coll : taggedCollections) {
+      for (TaggedPValue taggedColl : taggedCollections) {
+        checkArgument(
+            taggedColl.getValue() instanceof PCollection,
+            "A Union Coder can only be created for a Collection of Tagged %s. Got %s",
+            PCollection.class.getSimpleName(),
+            taggedColl.getValue().getClass().getSimpleName());
+        PCollection<?> coll = (PCollection<?>) taggedColl.getValue();
         WindowedValue.FullWindowedValueCoder<?> windowedValueCoder =
             WindowedValue.getFullCoder(
                 coll.getCoder(),
@@ -648,7 +655,8 @@ public class FlinkStreamingTransformTranslators {
 
       @SuppressWarnings("unchecked")
       WindowingStrategy<T, BoundedWindow> windowingStrategy =
-          (WindowingStrategy<T, BoundedWindow>) context.getOutput(transform).getWindowingStrategy();
+          (WindowingStrategy<T, BoundedWindow>)
+              context.getOutput(transform).getWindowingStrategy();
 
       TypeInformation<WindowedValue<T>> typeInfo =
           context.getTypeInfo(context.getOutput(transform));
@@ -940,7 +948,7 @@ public class FlinkStreamingTransformTranslators {
     public void translateNode(
         Flatten.FlattenPCollectionList<T> transform,
         FlinkStreamingTranslationContext context) {
-      List<PCollection<T>> allInputs = context.getInput(transform).getAll();
+      List<TaggedPValue> allInputs = context.getInputs(transform);
 
       if (allInputs.isEmpty()) {
 
@@ -967,8 +975,8 @@ public class FlinkStreamingTransformTranslators {
 
       } else {
         DataStream<T> result = null;
-        for (PCollection<T> collection : allInputs) {
-          DataStream<T> current = context.getInputDataStream(collection);
+        for (TaggedPValue input : allInputs) {
+          DataStream<T> current = context.getInputDataStream(input.getValue());
           result = (result == null) ? current : result.union(current);
         }
         context.setOutputDataStream(context.getOutput(transform), result);

http://git-wip-us.apache.org/repos/asf/beam/blob/7b062d71/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTranslationContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTranslationContext.java
index bc80d42..6db252e 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTranslationContext.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTranslationContext.java
@@ -19,7 +19,9 @@ package org.apache.beam.runners.flink.translation;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
+import com.google.common.collect.Iterables;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
 import org.apache.beam.sdk.coders.Coder;
@@ -31,6 +33,7 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TaggedPValue;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -99,12 +102,21 @@ public class FlinkStreamingTranslationContext {
 
 
   @SuppressWarnings("unchecked")
-  public <T extends PInput> T getInput(PTransform<T, ?> transform) {
-    return (T) currentTransform.getInput();
+  public <T extends PValue> T getInput(PTransform<T, ?> transform) {
+    return (T) Iterables.getOnlyElement(currentTransform.getInputs()).getValue();
+  }
+
+  public <T extends PInput> List<TaggedPValue> getInputs(PTransform<T, ?> transform) {
+    return currentTransform.getOutputs();
   }
 
   @SuppressWarnings("unchecked")
-  public <T extends POutput> T getOutput(PTransform<?, T> transform) {
-    return (T) currentTransform.getOutput();
+  public <T extends PValue> T getOutput(PTransform<?, T> transform) {
+    return (T) Iterables.getOnlyElement(currentTransform.getOutputs()).getValue();
   }
+
+  public <OutputT extends POutput> List<TaggedPValue> getOutputs(PTransform<?, OutputT> transform) {
+    return currentTransform.getOutputs();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7b062d71/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index babbe69..697bb58 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -46,6 +46,7 @@ import com.google.common.base.Supplier;
 import com.google.common.collect.BiMap;
 import com.google.common.collect.ImmutableBiMap;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -92,11 +93,11 @@ import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TaggedPValue;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TypedPValue;
 import org.slf4j.Logger;
@@ -257,12 +258,12 @@ public class DataflowPipelineTranslator {
      * A Map from PValues to their output names used by their producer
      * Dataflow steps.
      */
-    private final Map<POutput, String> outputNames = new HashMap<>();
+    private final Map<PValue, String> outputNames = new HashMap<>();
 
     /**
      * A Map from PValues to the Coders used for them.
      */
-    private final Map<POutput, Coder<?>> outputCoders = new HashMap<>();
+    private final Map<PValue, Coder<?>> outputCoders = new HashMap<>();
 
     /**
      * The transform currently being applied.
@@ -366,13 +367,24 @@ public class DataflowPipelineTranslator {
     }
 
     @Override
-    public <InputT extends PInput> InputT getInput(PTransform<InputT, ?> transform) {
-      return (InputT) getCurrentTransform(transform).getInput();
+    public <InputT extends PInput> List<TaggedPValue> getInputs(PTransform<InputT, ?> transform) {
+      return getCurrentTransform(transform).getInputs();
     }
 
     @Override
-    public <OutputT extends POutput> OutputT getOutput(PTransform<?, OutputT> transform) {
-      return (OutputT) getCurrentTransform(transform).getOutput();
+    public <InputT extends PValue> InputT getInput(PTransform<InputT, ?> transform) {
+      return (InputT) Iterables.getOnlyElement(getInputs(transform)).getValue();
+    }
+
+    @Override
+    public <OutputT extends POutput> List<TaggedPValue> getOutputs(
+        PTransform<?, OutputT> transform) {
+      return getCurrentTransform(transform).getOutputs();
+    }
+
+    @Override
+    public <OutputT extends PValue> OutputT getOutput(PTransform<?, OutputT> transform) {
+      return (OutputT) Iterables.getOnlyElement(getOutputs(transform)).getValue();
     }
 
     @Override
@@ -517,7 +529,7 @@ public class DataflowPipelineTranslator {
      * Records the name of the given output PValue,
      * within its producing transform.
      */
-    private void registerOutputName(POutput value, String name) {
+    private void registerOutputName(PValue value, String name) {
       if (outputNames.put(value, name) != null) {
         throw new IllegalArgumentException(
             "output " + value + " already has a name specified");
@@ -747,8 +759,8 @@ public class DataflowPipelineTranslator {
             StepTranslationContext stepContext = context.addStep(transform, "Flatten");
 
             List<OutputReference> inputs = new LinkedList<>();
-            for (PCollection<T> input : context.getInput(transform).getAll()) {
-              inputs.add(context.asOutputReference(input));
+            for (TaggedPValue input : context.getInputs(transform)) {
+              inputs.add(context.asOutputReference(input.getValue()));
             }
             stepContext.addInput(PropertyNames.INPUTS, inputs);
             stepContext.addOutput(context.getOutput(transform));
@@ -827,7 +839,7 @@ public class DataflowPipelineTranslator {
             translateInputs(
                 stepContext, context.getInput(transform), transform.getSideInputs(), context);
             BiMap<Long, TupleTag<?>> outputMap =
-                translateOutputs(context.getOutput(transform), stepContext);
+                translateOutputs(context.getOutputs(transform), stepContext);
             translateFn(
                 stepContext,
                 transform.getFn(),
@@ -872,8 +884,7 @@ public class DataflowPipelineTranslator {
         Window.Bound.class,
         new TransformTranslator<Bound>() {
           @Override
-          public void translate(
-              Window.Bound transform, TranslationContext context) {
+          public void translate(Window.Bound transform, TranslationContext context) {
             translateHelper(transform, context);
           }
 
@@ -883,7 +894,8 @@ public class DataflowPipelineTranslator {
             stepContext.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform));
             stepContext.addOutput(context.getOutput(transform));
 
-            WindowingStrategy<?, ?> strategy = context.getOutput(transform).getWindowingStrategy();
+            WindowingStrategy<?, ?> strategy =
+                context.getOutput(transform).getWindowingStrategy();
             byte[] serializedBytes = serializeToByteArray(strategy);
             String serializedJson = byteArrayToJsonString(serializedBytes);
             assert Arrays.equals(serializedBytes,
@@ -949,13 +961,16 @@ public class DataflowPipelineTranslator {
   }
 
   private static BiMap<Long, TupleTag<?>> translateOutputs(
-      PCollectionTuple outputs,
+      List<TaggedPValue> outputs,
       StepTranslationContext stepContext) {
     ImmutableBiMap.Builder<Long, TupleTag<?>> mapBuilder = ImmutableBiMap.builder();
-    for (Map.Entry<TupleTag<?>, PCollection<?>> entry
-             : outputs.getAll().entrySet()) {
-      TupleTag<?> tag = entry.getKey();
-      PCollection<?> output = entry.getValue();
+    for (TaggedPValue taggedOutput : outputs) {
+      TupleTag<?> tag = taggedOutput.getTag();
+      checkArgument(taggedOutput.getValue() instanceof PCollection,
+          "Non %s returned from Multi-output %s",
+          PCollection.class.getSimpleName(),
+          stepContext);
+      PCollection<?> output = (PCollection<?>) taggedOutput.getValue();
       mapBuilder.put(stepContext.addOutput(output), tag);
     }
     return mapBuilder.build();

http://git-wip-us.apache.org/repos/asf/beam/blob/7b062d71/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java
index 4297a80..6a82672 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java
@@ -28,6 +28,7 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TaggedPValue;
 
 /**
  * A {@link TransformTranslator} knows how to translate a particular subclass of {@link PTransform}
@@ -45,10 +46,14 @@ interface TransformTranslator<TransformT extends PTransform> {
     DataflowPipelineOptions getPipelineOptions();
 
     /** Returns the input of the currently being translated transform. */
-    <InputT extends PInput> InputT getInput(PTransform<InputT, ?> transform);
+    <InputT extends PInput> List<TaggedPValue> getInputs(PTransform<InputT, ?> transform);
+
+    <InputT extends PValue> InputT getInput(PTransform<InputT, ?> transform);
 
     /** Returns the output of the currently being translated transform. */
-    <OutputT extends POutput> OutputT getOutput(PTransform<?, OutputT> transform);
+    <OutputT extends POutput> List<TaggedPValue> getOutputs(PTransform<?, OutputT> transform);
+
+    <OutputT extends PValue> OutputT getOutput(PTransform<?, OutputT> transform);
 
     /** Returns the full name of the currently being translated transform. */
     String getFullName(PTransform<?, ?> transform);

http://git-wip-us.apache.org/repos/asf/beam/blob/7b062d71/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
index d5d7aa9..36bf129 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
@@ -58,10 +58,12 @@ import org.apache.beam.runners.dataflow.testing.TestDataflowPipelineOptions;
 import org.apache.beam.runners.dataflow.util.MonitoringUtil;
 import org.apache.beam.sdk.AggregatorRetrievalException;
 import org.apache.beam.sdk.AggregatorValues;
+import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult.State;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.ExpectedLogs;
 import org.apache.beam.sdk.testing.FastNanoClockAndSleeper;
+import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
@@ -354,7 +356,8 @@ public class DataflowPipelineJobTest {
     PTransform<PInput, POutput> pTransform = mock(PTransform.class);
     String stepName = "s1";
     String fullName = "Foo/Bar/Baz";
-    AppliedPTransform<?, ?, ?> appliedTransform = appliedPTransform(fullName, pTransform);
+    AppliedPTransform<?, ?, ?> appliedTransform =
+        appliedPTransform(fullName, pTransform, TestPipeline.create());
 
     DataflowAggregatorTransforms aggregatorTransforms = new DataflowAggregatorTransforms(
         ImmutableSetMultimap.<Aggregator<?, ?>, PTransform<?, ?>>of(aggregator, pTransform).asMap(),
@@ -389,7 +392,8 @@ public class DataflowPipelineJobTest {
     PTransform<PInput, POutput> pTransform = mock(PTransform.class);
     String stepName = "s1";
     String fullName = "Foo/Bar/Baz";
-    AppliedPTransform<?, ?, ?> appliedTransform = appliedPTransform(fullName, pTransform);
+    AppliedPTransform<?, ?, ?> appliedTransform =
+        appliedPTransform(fullName, pTransform, TestPipeline.create());
 
     DataflowAggregatorTransforms aggregatorTransforms = new DataflowAggregatorTransforms(
         ImmutableSetMultimap.<Aggregator<?, ?>, PTransform<?, ?>>of(aggregator, pTransform).asMap(),
@@ -426,7 +430,8 @@ public class DataflowPipelineJobTest {
     PTransform<PInput, POutput> pTransform = mock(PTransform.class);
     String stepName = "s1";
     String fullName = "Foo/Bar/Baz";
-    AppliedPTransform<?, ?, ?> appliedTransform = appliedPTransform(fullName, pTransform);
+    AppliedPTransform<?, ?, ?> appliedTransform =
+        appliedPTransform(fullName, pTransform, TestPipeline.create());
 
     DataflowAggregatorTransforms aggregatorTransforms = new DataflowAggregatorTransforms(
         ImmutableSetMultimap.<Aggregator<?, ?>, PTransform<?, ?>>of(aggregator, pTransform).asMap(),
@@ -471,18 +476,20 @@ public class DataflowPipelineJobTest {
     String aggregatorName = "agg";
     Aggregator<Long, Long> aggregator = new TestAggregator<>(combineFn, aggregatorName);
 
+    Pipeline p = TestPipeline.create();
+
     @SuppressWarnings("unchecked")
     PTransform<PInput, POutput> pTransform = mock(PTransform.class);
     String stepName = "s1";
     String fullName = "Foo/Bar/Baz";
-    AppliedPTransform<?, ?, ?> appliedTransform = appliedPTransform(fullName, pTransform);
+    AppliedPTransform<?, ?, ?> appliedTransform = appliedPTransform(fullName, pTransform, p);
 
     @SuppressWarnings("unchecked")
     PTransform<PInput, POutput> otherTransform = mock(PTransform.class);
     String otherStepName = "s88";
     String otherFullName = "Spam/Ham/Eggs";
     AppliedPTransform<?, ?, ?> otherAppliedTransform =
-        appliedPTransform(otherFullName, otherTransform);
+        appliedPTransform(otherFullName, otherTransform, p);
 
     DataflowAggregatorTransforms aggregatorTransforms = new DataflowAggregatorTransforms(
         ImmutableSetMultimap.<Aggregator<?, ?>, PTransform<?, ?>>of(
@@ -542,7 +549,8 @@ public class DataflowPipelineJobTest {
     PTransform<PInput, POutput> pTransform = mock(PTransform.class);
     String stepName = "s1";
     String fullName = "Foo/Bar/Baz";
-    AppliedPTransform<?, ?, ?> appliedTransform = appliedPTransform(fullName, pTransform);
+    AppliedPTransform<?, ?, ?> appliedTransform =
+        appliedPTransform(fullName, pTransform, TestPipeline.create());
 
     DataflowAggregatorTransforms aggregatorTransforms = new DataflowAggregatorTransforms(
         ImmutableSetMultimap.<Aggregator<?, ?>, PTransform<?, ?>>of(aggregator, pTransform).asMap(),
@@ -606,7 +614,8 @@ public class DataflowPipelineJobTest {
     PTransform<PInput, POutput> pTransform = mock(PTransform.class);
     String stepName = "s1";
     String fullName = "Foo/Bar/Baz";
-    AppliedPTransform<?, ?, ?> appliedTransform = appliedPTransform(fullName, pTransform);
+    AppliedPTransform<?, ?, ?> appliedTransform =
+        appliedPTransform(fullName, pTransform, TestPipeline.create());
 
     DataflowAggregatorTransforms aggregatorTransforms = new DataflowAggregatorTransforms(
         ImmutableSetMultimap.<Aggregator<?, ?>, PTransform<?, ?>>of(aggregator, pTransform).asMap(),
@@ -659,8 +668,10 @@ public class DataflowPipelineJobTest {
   }
 
   private AppliedPTransform<?, ?, ?> appliedPTransform(
-      String fullName, PTransform<PInput, POutput> transform) {
-    return AppliedPTransform.of(fullName, mock(PInput.class), mock(POutput.class), transform);
+      String fullName, PTransform<PInput, POutput> transform, Pipeline p) {
+    PInput input = mock(PInput.class);
+    when(input.getPipeline()).thenReturn(p);
+    return AppliedPTransform.of(fullName, input, mock(POutput.class), transform);
   }
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7b062d71/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
index 0ad862d..9096d5a 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
@@ -23,6 +23,7 @@ import static com.google.common.base.Preconditions.checkArgument;
 import com.google.common.collect.Iterables;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import org.apache.beam.runners.spark.SparkPipelineOptions;
@@ -34,9 +35,8 @@ import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TaggedPValue;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.streaming.api.java.JavaStreamingContext;
@@ -89,24 +89,32 @@ public class EvaluationContext {
     this.currentTransform = transform;
   }
 
-  public <T extends PInput> T getInput(PTransform<T, ?> transform) {
-    checkArgument(currentTransform != null && currentTransform.getTransform() == transform,
-        "can only be called with current transform");
+  public <T extends PValue> T getInput(PTransform<T, ?> transform) {
     @SuppressWarnings("unchecked")
-    T input = (T) currentTransform.getInput();
+    T input = (T) Iterables.getOnlyElement(getInputs(transform)).getValue();
     return input;
   }
 
-  public <T extends POutput> T getOutput(PTransform<?, T> transform) {
+  public <T> List<TaggedPValue> getInputs(PTransform<?, ?> transform) {
     checkArgument(currentTransform != null && currentTransform.getTransform() == transform,
         "can only be called with current transform");
+    return currentTransform.getInputs();
+  }
+
+  public <T extends PValue> T getOutput(PTransform<?, T> transform) {
     @SuppressWarnings("unchecked")
-    T output = (T) currentTransform.getOutput();
+    T output = (T) Iterables.getOnlyElement(getOutputs(transform)).getValue();
     return output;
   }
 
-  public void putDataset(PTransform<?, ?> transform, Dataset dataset) {
-    putDataset((PValue) getOutput(transform), dataset);
+  public List<TaggedPValue> getOutputs(PTransform<?, ?> transform) {
+    checkArgument(currentTransform != null && currentTransform.getTransform() == transform,
+        "can only be called with current transform");
+    return currentTransform.getOutputs();
+  }
+
+  public void putDataset(PTransform<?, ? extends PValue> transform, Dataset dataset) {
+    putDataset(getOutput(transform), dataset);
   }
 
   public void putDataset(PValue pvalue, Dataset dataset) {
@@ -119,18 +127,18 @@ public class EvaluationContext {
     leaves.add(dataset);
   }
 
-  <T> void putBoundedDatasetFromValues(PTransform<?, ?> transform, Iterable<T> values,
-                                       Coder<T> coder) {
-    datasets.put((PValue) getOutput(transform), new BoundedDataset<>(values, jsc, coder));
+  <T> void putBoundedDatasetFromValues(
+      PTransform<?, ? extends PValue> transform, Iterable<T> values, Coder<T> coder) {
+    datasets.put(getOutput(transform), new BoundedDataset<>(values, jsc, coder));
   }
 
   public <T> void putUnboundedDatasetFromQueue(
-      PTransform<?, ?> transform, Iterable<Iterable<T>> values, Coder<T> coder) {
-    datasets.put((PValue) getOutput(transform), new UnboundedDataset<>(values, jssc, coder));
+      PTransform<?, ? extends PValue> transform, Iterable<Iterable<T>> values, Coder<T> coder) {
+    datasets.put(getOutput(transform), new UnboundedDataset<>(values, jssc, coder));
   }
 
-  public Dataset borrowDataset(PTransform<?, ?> transform) {
-    return borrowDataset((PValue) getInput(transform));
+  public Dataset borrowDataset(PTransform<? extends PValue, ?> transform) {
+    return borrowDataset(getInput(transform));
   }
 
   public Dataset borrowDataset(PValue pvalue) {