You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/04/11 19:46:35 UTC

[1/4] beam git commit: This closes #2430

Repository: beam
Updated Branches:
  refs/heads/master a5a5bf946 -> 7c169a614


This closes #2430


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7c169a61
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7c169a61
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7c169a61

Branch: refs/heads/master
Commit: 7c169a614f339ed829c6d8b6bbc5c4868ff33ef5
Parents: a5a5bf9 0e5737f
Author: Thomas Groh <tg...@google.com>
Authored: Tue Apr 11 12:40:46 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Apr 11 12:40:46 2017 -0700

----------------------------------------------------------------------
 .../FlattenPCollectionTranslator.java           |  13 +-
 .../apex/translation/ParDoTranslator.java       |  13 +-
 .../apex/translation/TranslationContext.java    |  10 +-
 .../DeduplicatedFlattenFactory.java             |  18 ++-
 .../EmptyFlattenAsCreateFactory.java            |   7 +-
 .../core/construction/PTransformMatchers.java   |   5 +-
 .../core/construction/PrimitiveCreate.java      |   7 +-
 .../core/construction/ReplacementOutputs.java   |  63 ++++-----
 .../SingleInputOutputOverrideFactory.java       |  11 +-
 .../UnsupportedOverrideFactory.java             |   8 +-
 .../DeduplicatedFlattenFactoryTest.java         |   6 +-
 .../EmptyFlattenAsCreateFactoryTest.java        |   8 +-
 .../construction/PTransformMatchersTest.java    | 132 +++++++++----------
 .../construction/ReplacementOutputsTest.java    | 109 ++-------------
 .../SingleInputOutputOverrideFactoryTest.java   |   6 +-
 .../UnsupportedOverrideFactoryTest.java         |   7 +-
 .../direct/BoundedReadEvaluatorFactory.java     |   2 +-
 .../beam/runners/direct/DirectGraphVisitor.java |   5 +-
 .../direct/ExecutorServiceParallelExecutor.java |   4 +-
 .../runners/direct/FlattenEvaluatorFactory.java |   2 +-
 .../GroupAlsoByWindowEvaluatorFactory.java      |   2 +-
 .../direct/GroupByKeyOnlyEvaluatorFactory.java  |   4 +-
 .../direct/KeyedPValueTrackingVisitor.java      |  14 +-
 .../runners/direct/ParDoEvaluatorFactory.java   |  10 +-
 .../direct/ParDoMultiOverrideFactory.java       |   9 +-
 .../direct/StatefulParDoEvaluatorFactory.java   |   8 +-
 .../direct/TestStreamEvaluatorFactory.java      |   8 +-
 .../direct/UnboundedReadEvaluatorFactory.java   |   4 +-
 .../runners/direct/ViewEvaluatorFactory.java    |   4 +-
 .../runners/direct/ViewOverrideFactory.java     |   9 +-
 .../beam/runners/direct/WatermarkManager.java   |  19 +--
 .../runners/direct/WindowEvaluatorFactory.java  |   2 +-
 .../direct/WriteWithShardingFactory.java        |  10 +-
 .../runners/direct/DirectGraphVisitorTest.java  |   7 +-
 .../beam/runners/direct/ParDoEvaluatorTest.java |   2 +-
 .../StatefulParDoEvaluatorFactoryTest.java      |   2 +-
 .../direct/TestStreamEvaluatorFactoryTest.java  |   5 +-
 .../runners/direct/ViewOverrideFactoryTest.java |   2 +-
 .../flink/FlinkBatchTransformTranslators.java   |  36 ++---
 .../flink/FlinkBatchTranslationContext.java     |  11 +-
 .../flink/FlinkStreamingPipelineTranslator.java |   9 +-
 .../FlinkStreamingTransformTranslators.java     |  32 ++---
 .../flink/FlinkStreamingTranslationContext.java |  12 +-
 .../dataflow/BatchStatefulParDoOverrides.java   |  15 +--
 .../dataflow/DataflowPipelineTranslator.java    |  20 +--
 .../beam/runners/dataflow/DataflowRunner.java   |  27 ++--
 .../runners/dataflow/TransformTranslator.java   |   6 +-
 .../dataflow/DataflowPipelineJobTest.java       |   7 +-
 .../apache/beam/runners/spark/SparkRunner.java  |  20 +--
 .../beam/runners/spark/TestSparkRunner.java     |   7 +-
 .../spark/translation/EvaluationContext.java    |  11 +-
 .../spark/translation/TransformTranslator.java  |  25 ++--
 .../streaming/StreamingTransformTranslator.java |  20 +--
 .../sdk/runners/PTransformOverrideFactory.java  |  10 +-
 .../beam/sdk/runners/TransformHierarchy.java    |  80 +++++------
 .../beam/sdk/transforms/AppliedPTransform.java  |  17 ++-
 .../transforms/join/KeyedPCollectionTuple.java  |  12 +-
 .../java/org/apache/beam/sdk/values/PBegin.java |   6 +-
 .../apache/beam/sdk/values/PCollectionList.java |  27 ++--
 .../beam/sdk/values/PCollectionTuple.java       |  10 +-
 .../java/org/apache/beam/sdk/values/PDone.java  |   6 +-
 .../java/org/apache/beam/sdk/values/PInput.java |   4 +-
 .../org/apache/beam/sdk/values/POutput.java     |   4 +-
 .../java/org/apache/beam/sdk/values/PValue.java |   4 +-
 .../org/apache/beam/sdk/values/PValueBase.java  |   6 +-
 .../apache/beam/sdk/values/TaggedPValue.java    |   5 +
 .../java/org/apache/beam/sdk/PipelineTest.java  |  20 ++-
 .../sdk/runners/TransformHierarchyTest.java     |  72 +++++-----
 .../beam/sdk/values/PCollectionListTest.java    |  70 ++++------
 .../beam/sdk/values/PCollectionTupleTest.java   |   5 +-
 .../beam/sdk/io/gcp/bigquery/WriteResult.java   |  11 +-
 71 files changed, 526 insertions(+), 658 deletions(-)
----------------------------------------------------------------------



[3/4] beam git commit: Remove Orderdness of Input, Output expansions

Posted by tg...@apache.org.
http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
index 8b4573f..b44c890 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
@@ -45,7 +45,6 @@ import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PDone;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
-import org.apache.beam.sdk.values.TaggedPValue;
 import org.hamcrest.Matchers;
 import org.junit.Rule;
 import org.junit.Test;
@@ -101,9 +100,9 @@ public class DirectGraphVisitorTest implements Serializable {
             graph.getProducer(created), graph.getProducer(counted), graph.getProducer(unCounted)));
     for (AppliedPTransform<?, ?, ?> root : graph.getRootTransforms())  {
       // Root transforms will have no inputs
-      assertThat(root.getInputs(), emptyIterable());
+      assertThat(root.getInputs().entrySet(), emptyIterable());
       assertThat(
-          Iterables.getOnlyElement(root.getOutputs()).getValue(),
+          Iterables.getOnlyElement(root.getOutputs().values()),
           Matchers.<POutput>isOneOf(created, counted, unCounted));
     }
   }
@@ -121,7 +120,7 @@ public class DirectGraphVisitorTest implements Serializable {
         Matchers.<AppliedPTransform<?, ?, ?>>containsInAnyOrder(graph.getProducer(empty)));
     AppliedPTransform<?, ?, ?> onlyRoot = Iterables.getOnlyElement(graph.getRootTransforms());
     assertThat(onlyRoot.getTransform(), Matchers.<PTransform<?, ?>>equalTo(flatten));
-    assertThat(onlyRoot.getInputs(), Matchers.<TaggedPValue>emptyIterable());
+    assertThat(onlyRoot.getInputs().entrySet(), emptyIterable());
     assertThat(onlyRoot.getOutputs(), equalTo(empty.expand()));
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
index c85b85e..2a94d48 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
@@ -162,7 +162,7 @@ public class ParDoEvaluatorTest {
         evaluationContext,
         stepContext,
         transform,
-        ((PCollection<?>) Iterables.getOnlyElement(transform.getInputs()).getValue())
+        ((PCollection<?>) Iterables.getOnlyElement(transform.getInputs().values()))
             .getWindowingStrategy(),
         fn,
         null /* key */,

http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
index 946cd69..ecb8130 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
@@ -308,7 +308,7 @@ public class StatefulParDoEvaluatorFactoryTest implements Serializable {
         BUNDLE_FACTORY
             .createBundle(
                 (PCollection<KeyedWorkItem<String, KV<String, Integer>>>)
-                    Iterables.getOnlyElement(producingTransform.getInputs()).getValue())
+                    Iterables.getOnlyElement(producingTransform.getInputs().values()))
             .add(gbkOutputElement)
             .commit(Instant.now());
     TransformEvaluator<KeyedWorkItem<String, KV<String, Integer>>> evaluator =

http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java
index fc689fe..0d909c2 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java
@@ -40,8 +40,9 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TupleTag;
 import org.hamcrest.Matchers;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
@@ -183,7 +184,7 @@ public class TestStreamEvaluatorFactoryTest {
   @Test
   public void overrideFactoryGetInputSucceeds() {
     DirectTestStreamFactory<?> factory = new DirectTestStreamFactory<>(runner);
-    PBegin begin = factory.getInput(Collections.<TaggedPValue>emptyList(), p);
+    PBegin begin = factory.getInput(Collections.<TupleTag<?>, PValue>emptyMap(), p);
     assertThat(begin.getPipeline(), Matchers.<Pipeline>equalTo(p));
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java
index 6dcc13c..258cb46 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java
@@ -107,7 +107,7 @@ public class ViewOverrideFactoryTest implements Serializable {
                   is(false));
               PCollectionView replacementView = ((WriteView) node.getTransform()).getView();
               assertThat(replacementView, Matchers.<PCollectionView>theInstance(view));
-              assertThat(node.getInputs(), hasSize(1));
+              assertThat(node.getInputs().entrySet(), hasSize(1));
             }
           }
         });

http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
index 1d6728b..ff9521c 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
@@ -27,6 +27,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows;
 import org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction;
 import org.apache.beam.runners.flink.translation.functions.FlinkMergingNonShuffleReduceFunction;
@@ -71,7 +72,6 @@ import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TaggedPValue;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.functions.FlatMapFunction;
@@ -511,15 +511,15 @@ class FlinkBatchTransformTranslators {
       DataSet<WindowedValue<InputT>> inputDataSet =
           context.getInputDataSet(context.getInput(transform));
 
-      List<TaggedPValue> outputs = context.getOutputs(transform);
+      Map<TupleTag<?>, PValue> outputs = context.getOutputs(transform);
 
       Map<TupleTag<?>, Integer> outputMap = Maps.newHashMap();
       // put the main output at index 0, FlinkMultiOutputDoFnFunction  expects this
       outputMap.put(transform.getMainOutputTag(), 0);
       int count = 1;
-      for (TaggedPValue taggedValue : outputs) {
-        if (!outputMap.containsKey(taggedValue.getTag())) {
-          outputMap.put(taggedValue.getTag(), count++);
+      for (TupleTag<?> tag : outputs.keySet()) {
+        if (!outputMap.containsKey(tag)) {
+          outputMap.put(tag, count++);
         }
       }
 
@@ -528,13 +528,13 @@ class FlinkBatchTransformTranslators {
 
       // collect all output Coders and create a UnionCoder for our tagged outputs
       List<Coder<?>> outputCoders = Lists.newArrayList();
-      for (TaggedPValue taggedValue : outputs) {
+      for (PValue taggedValue : outputs.values()) {
         checkState(
-            taggedValue.getValue() instanceof PCollection,
+            taggedValue instanceof PCollection,
             "Within ParDo, got a non-PCollection output %s of type %s",
-            taggedValue.getValue(),
-            taggedValue.getValue().getClass().getSimpleName());
-        PCollection<?> coll = (PCollection<?>) taggedValue.getValue();
+            taggedValue,
+            taggedValue.getClass().getSimpleName());
+        PCollection<?> coll = (PCollection<?>) taggedValue;
         outputCoders.add(coll.getCoder());
         windowingStrategy = coll.getWindowingStrategy();
       }
@@ -599,11 +599,11 @@ class FlinkBatchTransformTranslators {
 
       transformSideInputs(sideInputs, outputDataSet, context);
 
-      for (TaggedPValue output : outputs) {
+      for (Entry<TupleTag<?>, PValue> output : outputs.entrySet()) {
         pruneOutput(
             outputDataSet,
             context,
-            outputMap.get(output.getTag()),
+            outputMap.get(output.getKey()),
             (PCollection) output.getValue());
       }
 
@@ -640,7 +640,7 @@ class FlinkBatchTransformTranslators {
         Flatten.PCollections<T> transform,
         FlinkBatchTranslationContext context) {
 
-      List<TaggedPValue> allInputs = context.getInputs(transform);
+      Map<TupleTag<?>, PValue> allInputs = context.getInputs(transform);
       DataSet<WindowedValue<T>> result = null;
 
       if (allInputs.isEmpty()) {
@@ -661,13 +661,13 @@ class FlinkBatchTransformTranslators {
                     (Coder<T>) VoidCoder.of(),
                     GlobalWindow.Coder.INSTANCE)));
       } else {
-        for (TaggedPValue taggedPc : allInputs) {
+        for (PValue taggedPc : allInputs.values()) {
           checkArgument(
-              taggedPc.getValue() instanceof PCollection,
+              taggedPc instanceof PCollection,
               "Got non-PCollection input to flatten: %s of type %s",
-              taggedPc.getValue(),
-              taggedPc.getValue().getClass().getSimpleName());
-          PCollection<T> collection = (PCollection<T>) taggedPc.getValue();
+              taggedPc,
+              taggedPc.getClass().getSimpleName());
+          PCollection<T> collection = (PCollection<T>) taggedPc;
           DataSet<WindowedValue<T>> current = context.getInputDataSet(collection);
           if (result == null) {
             result = current;

http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java
index cb69575..98dd0fb 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java
@@ -19,7 +19,6 @@ package org.apache.beam.runners.flink;
 
 import com.google.common.collect.Iterables;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
 import org.apache.beam.sdk.coders.Coder;
@@ -31,7 +30,7 @@ import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.beam.sdk.values.TupleTag;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
@@ -134,21 +133,21 @@ class FlinkBatchTranslationContext {
     return new CoderTypeInformation<>(windowedValueCoder);
   }
 
-  List<TaggedPValue> getInputs(PTransform<?, ?> transform) {
+  Map<TupleTag<?>, PValue> getInputs(PTransform<?, ?> transform) {
     return currentTransform.getInputs();
   }
 
   @SuppressWarnings("unchecked")
   <T extends PValue> T getInput(PTransform<T, ?> transform) {
-    return (T) Iterables.getOnlyElement(currentTransform.getInputs()).getValue();
+    return (T) Iterables.getOnlyElement(currentTransform.getInputs().values());
   }
 
-  List<TaggedPValue> getOutputs(PTransform<?, ?> transform) {
+  Map<TupleTag<?>, PValue> getOutputs(PTransform<?, ?> transform) {
     return currentTransform.getOutputs();
   }
 
   @SuppressWarnings("unchecked")
   <T extends PValue> T getOutput(PTransform<?, T> transform) {
-    return (T) Iterables.getOnlyElement(currentTransform.getOutputs()).getValue();
+    return (T) Iterables.getOnlyElement(currentTransform.getOutputs().values());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
index 8b5637e..70da2b3 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
@@ -39,7 +39,7 @@ import org.apache.beam.sdk.util.InstanceBuilder;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.beam.sdk.values.TupleTag;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -259,14 +259,13 @@ class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator {
     }
 
     @Override
-    public PCollection<? extends InputT> getInput(
-        List<TaggedPValue> inputs, Pipeline p) {
-      return (PCollection<? extends InputT>) Iterables.getOnlyElement(inputs).getValue();
+    public PCollection<? extends InputT> getInput(Map<TupleTag<?>, PValue> inputs, Pipeline p) {
+      return (PCollection<? extends InputT>) Iterables.getOnlyElement(inputs.values());
     }
 
     @Override
     public Map<PValue, ReplacementOutput> mapOutputs(
-        List<TaggedPValue> outputs, PCollectionTuple newOutput) {
+        Map<TupleTag<?>, PValue> outputs, PCollectionTuple newOutput) {
       return ReplacementOutputs.tagged(outputs, newOutput);
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
index 5c29db2..af157f0 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
@@ -29,6 +29,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import org.apache.beam.runners.core.ElementAndRestriction;
 import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.SplittableParDo;
@@ -79,7 +80,6 @@ import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TaggedPValue;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.MapFunction;
@@ -420,7 +420,7 @@ class FlinkStreamingTransformTranslators {
         DoFn<InputT, OutputT> doFn,
         PCollection<InputT> input,
         List<PCollectionView<?>> sideInputs,
-        List<TaggedPValue> outputs,
+        Map<TupleTag<?>, PValue> outputs,
         TupleTag<OutputT> mainOutputTag,
         List<TupleTag<?>> sideOutputTags,
         FlinkStreamingTranslationContext context,
@@ -537,8 +537,8 @@ class FlinkStreamingTransformTranslators {
                 }
               });
 
-      for (TaggedPValue output : outputs) {
-        final int outputTag = tagsToLabels.get(output.getTag());
+      for (Entry<TupleTag<?>, PValue> output : outputs.entrySet()) {
+        final int outputTag = tagsToLabels.get(output.getKey());
 
         TypeInformation outputTypeInfo = context.getTypeInfo((PCollection<?>) output.getValue());
 
@@ -557,28 +557,28 @@ class FlinkStreamingTransformTranslators {
 
     private static Map<TupleTag<?>, Integer> transformTupleTagsToLabels(
         TupleTag<?> mainTag,
-        List<TaggedPValue> allTaggedValues) {
+        Map<TupleTag<?>, PValue> allTaggedValues) {
 
       Map<TupleTag<?>, Integer> tagToLabelMap = Maps.newHashMap();
       int count = 0;
       tagToLabelMap.put(mainTag, count++);
-      for (TaggedPValue taggedPValue : allTaggedValues) {
-        if (!tagToLabelMap.containsKey(taggedPValue.getTag())) {
-          tagToLabelMap.put(taggedPValue.getTag(), count++);
+      for (TupleTag<?> key : allTaggedValues.keySet()) {
+        if (!tagToLabelMap.containsKey(key)) {
+          tagToLabelMap.put(key, count++);
         }
       }
       return tagToLabelMap;
     }
 
-    private static UnionCoder createUnionCoder(Collection<TaggedPValue> taggedCollections) {
+    private static UnionCoder createUnionCoder(Map<TupleTag<?>, PValue> taggedCollections) {
       List<Coder<?>> outputCoders = Lists.newArrayList();
-      for (TaggedPValue taggedColl : taggedCollections) {
+      for (PValue taggedColl : taggedCollections.values()) {
         checkArgument(
-            taggedColl.getValue() instanceof PCollection,
+            taggedColl instanceof PCollection,
             "A Union Coder can only be created for a Collection of Tagged %s. Got %s",
             PCollection.class.getSimpleName(),
-            taggedColl.getValue().getClass().getSimpleName());
-        PCollection<?> coll = (PCollection<?>) taggedColl.getValue();
+            taggedColl.getClass().getSimpleName());
+        PCollection<?> coll = (PCollection<?>) taggedColl;
         WindowedValue.FullWindowedValueCoder<?> windowedValueCoder =
             WindowedValue.getFullCoder(
                 coll.getCoder(),
@@ -1042,7 +1042,7 @@ class FlinkStreamingTransformTranslators {
     public void translateNode(
         Flatten.PCollections<T> transform,
         FlinkStreamingTranslationContext context) {
-      List<TaggedPValue> allInputs = context.getInputs(transform);
+      Map<TupleTag<?>, PValue> allInputs = context.getInputs(transform);
 
       if (allInputs.isEmpty()) {
 
@@ -1069,8 +1069,8 @@ class FlinkStreamingTransformTranslators {
 
       } else {
         DataStream<T> result = null;
-        for (TaggedPValue input : allInputs) {
-          DataStream<T> current = context.getInputDataStream(input.getValue());
+        for (PValue input : allInputs.values()) {
+          DataStream<T> current = context.getInputDataStream(input);
           result = (result == null) ? current : result.union(current);
         }
         context.setOutputDataStream(context.getOutput(transform), result);

http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java
index 3d5b83f..1a943a3 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java
@@ -21,7 +21,6 @@ import static com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.common.collect.Iterables;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
 import org.apache.beam.sdk.coders.Coder;
@@ -33,7 +32,7 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.beam.sdk.values.TupleTag;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -111,19 +110,20 @@ class FlinkStreamingTranslationContext {
 
   @SuppressWarnings("unchecked")
   public <T extends PValue> T getInput(PTransform<T, ?> transform) {
-    return (T) Iterables.getOnlyElement(currentTransform.getInputs()).getValue();
+    return (T) Iterables.getOnlyElement(currentTransform.getInputs().values());
   }
 
-  public <T extends PInput> List<TaggedPValue> getInputs(PTransform<T, ?> transform) {
+  public <T extends PInput> Map<TupleTag<?>, PValue> getInputs(PTransform<T, ?> transform) {
     return currentTransform.getInputs();
   }
 
   @SuppressWarnings("unchecked")
   public <T extends PValue> T getOutput(PTransform<?, T> transform) {
-    return (T) Iterables.getOnlyElement(currentTransform.getOutputs()).getValue();
+    return (T) Iterables.getOnlyElement(currentTransform.getOutputs().values());
   }
 
-  public <OutputT extends POutput> List<TaggedPValue> getOutputs(PTransform<?, OutputT> transform) {
+  public <OutputT extends POutput> Map<TupleTag<?>, PValue> getOutputs(
+      PTransform<?, OutputT> transform) {
     return currentTransform.getOutputs();
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java
index 1d19d64..3ded079 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java
@@ -20,7 +20,6 @@ package org.apache.beam.runners.dataflow;
 import static com.google.common.base.Preconditions.checkState;
 
 import com.google.common.collect.Iterables;
-import java.util.List;
 import java.util.Map;
 import org.apache.beam.runners.core.construction.ReplacementOutputs;
 import org.apache.beam.runners.dataflow.BatchViewOverrides.GroupByKeyAndSortValuesOnly;
@@ -42,7 +41,7 @@ import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TypeDescriptor;
 import org.joda.time.Instant;
 
@@ -93,13 +92,13 @@ public class BatchStatefulParDoOverrides {
     }
 
     @Override
-    public PCollection<KV<K, InputT>> getInput(List<TaggedPValue> inputs, Pipeline p) {
-      return (PCollection<KV<K, InputT>>) Iterables.getOnlyElement(inputs).getValue();
+    public PCollection<KV<K, InputT>> getInput(Map<TupleTag<?>, PValue> inputs, Pipeline p) {
+      return (PCollection<KV<K, InputT>>) Iterables.getOnlyElement(inputs.values());
     }
 
     @Override
     public Map<PValue, ReplacementOutput> mapOutputs(
-        List<TaggedPValue> outputs, PCollection<OutputT> newOutput) {
+        Map<TupleTag<?>, PValue> outputs, PCollection<OutputT> newOutput) {
       return ReplacementOutputs.singleton(outputs, newOutput);
     }
   }
@@ -116,13 +115,13 @@ public class BatchStatefulParDoOverrides {
     }
 
     @Override
-    public PCollection<KV<K, InputT>> getInput(List<TaggedPValue> inputs, Pipeline p) {
-      return (PCollection<KV<K, InputT>>) Iterables.getOnlyElement(inputs).getValue();
+    public PCollection<KV<K, InputT>> getInput(Map<TupleTag<?>, PValue> inputs, Pipeline p) {
+      return (PCollection<KV<K, InputT>>) Iterables.getOnlyElement(inputs.values());
     }
 
     @Override
     public Map<PValue, ReplacementOutput> mapOutputs(
-        List<TaggedPValue> outputs, PCollectionTuple newOutput) {
+        Map<TupleTag<?>, PValue> outputs, PCollectionTuple newOutput) {
       return ReplacementOutputs.tagged(outputs, newOutput);
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index ab9df70..1a2e663 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -97,7 +97,6 @@ import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TaggedPValue;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TypedPValue;
 import org.slf4j.Logger;
@@ -371,24 +370,25 @@ public class DataflowPipelineTranslator {
     }
 
     @Override
-    public <InputT extends PInput> List<TaggedPValue> getInputs(PTransform<InputT, ?> transform) {
+    public <InputT extends PInput> Map<TupleTag<?>, PValue> getInputs(
+        PTransform<InputT, ?> transform) {
       return getCurrentTransform(transform).getInputs();
     }
 
     @Override
     public <InputT extends PValue> InputT getInput(PTransform<InputT, ?> transform) {
-      return (InputT) Iterables.getOnlyElement(getInputs(transform)).getValue();
+      return (InputT) Iterables.getOnlyElement(getInputs(transform).values());
     }
 
     @Override
-    public <OutputT extends POutput> List<TaggedPValue> getOutputs(
+    public <OutputT extends POutput> Map<TupleTag<?>, PValue> getOutputs(
         PTransform<?, OutputT> transform) {
       return getCurrentTransform(transform).getOutputs();
     }
 
     @Override
     public <OutputT extends PValue> OutputT getOutput(PTransform<?, OutputT> transform) {
-      return (OutputT) Iterables.getOnlyElement(getOutputs(transform)).getValue();
+      return (OutputT) Iterables.getOnlyElement(getOutputs(transform).values());
     }
 
     @Override
@@ -758,10 +758,10 @@ public class DataflowPipelineTranslator {
             StepTranslationContext stepContext = context.addStep(transform, "Flatten");
 
             List<OutputReference> inputs = new LinkedList<>();
-            for (TaggedPValue input : context.getInputs(transform)) {
+            for (PValue input : context.getInputs(transform).values()) {
               inputs.add(
                   context.asOutputReference(
-                      input.getValue(), context.getProducer(input.getValue())));
+                      input, context.getProducer(input)));
             }
             stepContext.addInput(PropertyNames.INPUTS, inputs);
             stepContext.addOutput(context.getOutput(transform));
@@ -967,11 +967,11 @@ public class DataflowPipelineTranslator {
   }
 
   private static BiMap<Long, TupleTag<?>> translateOutputs(
-      List<TaggedPValue> outputs,
+      Map<TupleTag<?>, PValue> outputs,
       StepTranslationContext stepContext) {
     ImmutableBiMap.Builder<Long, TupleTag<?>> mapBuilder = ImmutableBiMap.builder();
-    for (TaggedPValue taggedOutput : outputs) {
-      TupleTag<?> tag = taggedOutput.getTag();
+    for (Map.Entry<TupleTag<?>, PValue> taggedOutput : outputs.entrySet()) {
+      TupleTag<?> tag = taggedOutput.getKey();
       checkArgument(taggedOutput.getValue() instanceof PCollection,
           "Non %s returned from Multi-output %s",
           PCollection.class.getSimpleName(),

http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index f789769..9b993f4 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -127,7 +127,7 @@ import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PDone;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.beam.sdk.values.TupleTag;
 import org.joda.time.DateTimeUtils;
 import org.joda.time.DateTimeZone;
 import org.joda.time.format.DateTimeFormat;
@@ -450,13 +450,13 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     }
 
     @Override
-    public PBegin getInput(List<TaggedPValue> inputs, Pipeline p) {
+    public PBegin getInput(Map<TupleTag<?>, PValue> inputs, Pipeline p) {
       return p.begin();
     }
 
     @Override
     public Map<PValue, ReplacementOutput> mapOutputs(
-        List<TaggedPValue> outputs, PCollection<T> newOutput) {
+        Map<TupleTag<?>, PValue> outputs, PCollection<T> newOutput) {
       return ReplacementOutputs.singleton(outputs, newOutput);
     }
   }
@@ -760,7 +760,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
               if (node.getTransform() instanceof View.AsMap
                   || node.getTransform() instanceof View.AsMultimap) {
                 PCollection<KV<?, ?>> input =
-                    (PCollection<KV<?, ?>>) Iterables.getOnlyElement(node.getInputs()).getValue();
+                    (PCollection<KV<?, ?>>) Iterables.getOnlyElement(node.getInputs().values());
                 KvCoder<?, ?> inputCoder = (KvCoder) input.getCoder();
                 try {
                   inputCoder.getKeyCoder().verifyDeterministic();
@@ -825,13 +825,13 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     }
 
     @Override
-    public PCollection<T> getInput(List<TaggedPValue> inputs, Pipeline p) {
-      return (PCollection<T>) Iterables.getOnlyElement(inputs).getValue();
+    public PCollection<T> getInput(Map<TupleTag<?>, PValue> inputs, Pipeline p) {
+      return (PCollection<T>) Iterables.getOnlyElement(inputs.values());
     }
 
     @Override
     public Map<PValue, ReplacementOutput> mapOutputs(
-        List<TaggedPValue> outputs, PDone newOutput) {
+        Map<TupleTag<?>, PValue> outputs, PDone newOutput) {
       return Collections.emptyMap();
     }
   }
@@ -1317,13 +1317,13 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
 
     @Override
     public PCollection<KV<K, Iterable<InputT>>> getInput(
-        List<TaggedPValue> inputs, Pipeline p) {
-      return (PCollection<KV<K, Iterable<InputT>>>) Iterables.getOnlyElement(inputs).getValue();
+        Map<TupleTag<?>, PValue> inputs, Pipeline p) {
+      return (PCollection<KV<K, Iterable<InputT>>>) Iterables.getOnlyElement(inputs.values());
     }
 
     @Override
     public Map<PValue, ReplacementOutput> mapOutputs(
-        List<TaggedPValue> outputs, PCollection<KV<K, OutputT>> newOutput) {
+        Map<TupleTag<?>, PValue> outputs, PCollection<KV<K, OutputT>> newOutput) {
       return ReplacementOutputs.singleton(outputs, newOutput);
     }
   }
@@ -1343,12 +1343,13 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     }
 
     @Override
-    public PCollection<T> getInput(List<TaggedPValue> inputs, Pipeline p) {
-      return (PCollection<T>) Iterables.getOnlyElement(inputs).getValue();
+    public PCollection<T> getInput(Map<TupleTag<?>, PValue> inputs, Pipeline p) {
+      return (PCollection<T>) Iterables.getOnlyElement(inputs.values());
     }
 
     @Override
-    public Map<PValue, ReplacementOutput> mapOutputs(List<TaggedPValue> outputs, PDone newOutput) {
+    public Map<PValue, ReplacementOutput> mapOutputs(
+        Map<TupleTag<?>, PValue> outputs, PDone newOutput) {
       return Collections.emptyMap();
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java
index e020e83..52b3a31 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java
@@ -29,7 +29,7 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.beam.sdk.values.TupleTag;
 
 /**
  * A {@link TransformTranslator} knows how to translate a particular subclass of {@link PTransform}
@@ -47,12 +47,12 @@ interface TransformTranslator<TransformT extends PTransform> {
     DataflowPipelineOptions getPipelineOptions();
 
     /** Returns the input of the currently being translated transform. */
-    <InputT extends PInput> List<TaggedPValue> getInputs(PTransform<InputT, ?> transform);
+    <InputT extends PInput> Map<TupleTag<?>, PValue> getInputs(PTransform<InputT, ?> transform);
 
     <InputT extends PValue> InputT getInput(PTransform<InputT, ?> transform);
 
     /** Returns the output of the currently being translated transform. */
-    <OutputT extends POutput> List<TaggedPValue> getOutputs(PTransform<?, OutputT> transform);
+    <OutputT extends POutput> Map<TupleTag<?>, PValue> getOutputs(PTransform<?, OutputT> transform);
 
     <OutputT extends PValue> OutputT getOutput(PTransform<?, OutputT> transform);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
index e3d2e4e..e7f2b48 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
@@ -72,7 +72,8 @@ import org.apache.beam.sdk.util.NoopPathValidator;
 import org.apache.beam.sdk.util.TestCredential;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
-import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
 import org.joda.time.Duration;
 import org.junit.Before;
 import org.junit.Rule;
@@ -689,8 +690,8 @@ public class DataflowPipelineJobTest {
     when(input.getPipeline()).thenReturn(p);
     return AppliedPTransform.of(
         fullName,
-        Collections.<TaggedPValue>emptyList(),
-        Collections.<TaggedPValue>emptyList(),
+        Collections.<TupleTag<?>, PValue>emptyMap(),
+        Collections.<TupleTag<?>, PValue>emptyMap(),
         transform,
         p);
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
index 5b4f73e..97487f3 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
@@ -22,6 +22,7 @@ import com.google.common.collect.Iterables;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -55,7 +56,7 @@ import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.beam.sdk.values.TupleTag;
 import org.apache.spark.SparkEnv$;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.metrics.MetricsSystem;
@@ -315,8 +316,7 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
       // The goal is to detect the PCollections accessed more than one time, and so enable cache
       // on the underlying RDDs or DStreams.
 
-      for (TaggedPValue input : node.getInputs()) {
-        PValue value = input.getValue();
+      for (PValue value : node.getInputs().values()) {
         if (value instanceof PCollection) {
           long count = 1L;
           if (ctxt.getCacheCandidates().get(value) != null) {
@@ -362,7 +362,7 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
       if (node.getInputs().size() != 1) {
         return false;
       }
-      PValue input = Iterables.getOnlyElement(node.getInputs()).getValue();
+      PValue input = Iterables.getOnlyElement(node.getInputs().values());
       if (!(input instanceof PCollection)
           || ((PCollection) input).getWindowingStrategy().getWindowFn().isNonMerging()) {
         return false;
@@ -420,14 +420,14 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
       //--- determine if node is bounded/unbounded.
       // usually, the input determines if the PCollection to apply the next transformation to
       // is BOUNDED or UNBOUNDED, meaning RDD/DStream.
-      Collection<TaggedPValue> pValues;
+      Map<TupleTag<?>, PValue> pValues;
       if (node.getInputs().isEmpty()) {
         // in case of a PBegin, it's the output.
         pValues = node.getOutputs();
       } else {
         pValues = node.getInputs();
       }
-      PCollection.IsBounded isNodeBounded = isBoundedCollection(pValues);
+      PCollection.IsBounded isNodeBounded = isBoundedCollection(pValues.values());
       // translate accordingly.
       LOG.debug("Translating {} as {}", transform, isNodeBounded);
       return isNodeBounded.equals(PCollection.IsBounded.BOUNDED)
@@ -435,15 +435,15 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
           : translator.translateUnbounded(transformClass);
     }
 
-    protected PCollection.IsBounded isBoundedCollection(Collection<TaggedPValue> pValues) {
+    protected PCollection.IsBounded isBoundedCollection(Collection<PValue> pValues) {
       // anything that is not a PCollection, is BOUNDED.
       // For PCollections:
       // BOUNDED behaves as the Identity Element, BOUNDED + BOUNDED = BOUNDED
       // while BOUNDED + UNBOUNDED = UNBOUNDED.
       PCollection.IsBounded isBounded = PCollection.IsBounded.BOUNDED;
-      for (TaggedPValue pValue : pValues) {
-        if (pValue.getValue() instanceof PCollection) {
-          isBounded = isBounded.and(((PCollection) pValue.getValue()).isBounded());
+      for (PValue pValue : pValues) {
+        if (pValue instanceof PCollection) {
+          isBounded = isBounded.and(((PCollection) pValue).isBounded());
         } else {
           isBounded = isBounded.and(PCollection.IsBounded.BOUNDED);
         }

http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
index fcc00f9..aacb942 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
@@ -28,7 +28,6 @@ import com.google.common.util.concurrent.Uninterruptibles;
 import java.io.File;
 import java.io.IOException;
 import java.util.Collections;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import org.apache.beam.runners.core.construction.PTransformMatchers;
@@ -53,7 +52,7 @@ import org.apache.beam.sdk.util.ValueWithRecordId;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.beam.sdk.values.TupleTag;
 import org.apache.commons.io.FileUtils;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
@@ -251,13 +250,13 @@ public final class TestSparkRunner extends PipelineRunner<SparkPipelineResult> {
       }
 
       @Override
-      public PBegin getInput(List<TaggedPValue> inputs, Pipeline p) {
+      public PBegin getInput(Map<TupleTag<?>, PValue> inputs, Pipeline p) {
         return p.begin();
       }
 
       @Override
       public Map<PValue, ReplacementOutput> mapOutputs(
-          List<TaggedPValue> outputs, PCollection<T> newOutput) {
+          Map<TupleTag<?>, PValue> outputs, PCollection<T> newOutput) {
         return ReplacementOutputs.singleton(outputs, newOutput);
       }
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
index 643749d..838c504 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
@@ -24,7 +24,6 @@ import com.google.common.collect.Iterables;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import org.apache.beam.runners.spark.SparkPipelineOptions;
@@ -37,7 +36,7 @@ import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.beam.sdk.values.TupleTag;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.streaming.api.java.JavaStreamingContext;
@@ -96,11 +95,11 @@ public class EvaluationContext {
 
   public <T extends PValue> T getInput(PTransform<T, ?> transform) {
     @SuppressWarnings("unchecked")
-    T input = (T) Iterables.getOnlyElement(getInputs(transform)).getValue();
+    T input = (T) Iterables.getOnlyElement(getInputs(transform).values());
     return input;
   }
 
-  public <T> List<TaggedPValue> getInputs(PTransform<?, ?> transform) {
+  public <T> Map<TupleTag<?>, PValue> getInputs(PTransform<?, ?> transform) {
     checkArgument(currentTransform != null && currentTransform.getTransform() == transform,
         "can only be called with current transform");
     return currentTransform.getInputs();
@@ -108,11 +107,11 @@ public class EvaluationContext {
 
   public <T extends PValue> T getOutput(PTransform<?, T> transform) {
     @SuppressWarnings("unchecked")
-    T output = (T) Iterables.getOnlyElement(getOutputs(transform)).getValue();
+    T output = (T) Iterables.getOnlyElement(getOutputs(transform).values());
     return output;
   }
 
-  public List<TaggedPValue> getOutputs(PTransform<?, ?> transform) {
+  public Map<TupleTag<?>, PValue> getOutputs(PTransform<?, ?> transform) {
     checkArgument(currentTransform != null && currentTransform.getTransform() == transform,
         "can only be called with current transform");
     return currentTransform.getOutputs();

http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index 7894c4e..c2a8b06 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -26,9 +26,10 @@ import static org.apache.beam.runners.spark.translation.TranslationUtils.rejectS
 import com.google.common.base.Optional;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import java.util.Collection;
 import java.util.Collections;
-import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import org.apache.beam.runners.core.SystemReduceFn;
 import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator;
 import org.apache.beam.runners.spark.aggregators.NamedAggregators;
@@ -61,7 +62,7 @@ import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.spark.Accumulator;
 import org.apache.spark.api.java.JavaPairRDD;
@@ -83,19 +84,21 @@ public final class TransformTranslator {
       @SuppressWarnings("unchecked")
       @Override
       public void evaluate(Flatten.PCollections<T> transform, EvaluationContext context) {
-        List<TaggedPValue> pcs = context.getInputs(transform);
+        Collection<PValue> pcs = context.getInputs(transform).values();
         JavaRDD<WindowedValue<T>> unionRDD;
         if (pcs.size() == 0) {
           unionRDD = context.getSparkContext().emptyRDD();
         } else {
           JavaRDD<WindowedValue<T>>[] rdds = new JavaRDD[pcs.size()];
-          for (int i = 0; i < rdds.length; i++) {
+          int index = 0;
+          for (PValue pc : pcs) {
             checkArgument(
-                pcs.get(i).getValue() instanceof PCollection,
+                pc instanceof PCollection,
                 "Flatten had non-PCollection value in input: %s of type %s",
-                pcs.get(i).getValue(),
-                pcs.get(i).getValue().getClass().getSimpleName());
-            rdds[i] = ((BoundedDataset<T>) context.borrowDataset(pcs.get(i).getValue())).getRDD();
+                pc,
+                pc.getClass().getSimpleName());
+            rdds[index] = ((BoundedDataset<T>) context.borrowDataset(pc)).getRDD();
+            index++;
           }
           unionRDD = context.getSparkContext().union(rdds);
         }
@@ -360,15 +363,15 @@ public final class TransformTranslator {
                     transform.getMainOutputTag(),
                     TranslationUtils.getSideInputs(transform.getSideInputs(), context),
                     windowingStrategy));
-        List<TaggedPValue> outputs = context.getOutputs(transform);
+        Map<TupleTag<?>, PValue> outputs = context.getOutputs(transform);
         if (outputs.size() > 1) {
           // cache the RDD if we're going to filter it more than once.
           all.cache();
         }
-        for (TaggedPValue output : outputs) {
+        for (Map.Entry<TupleTag<?>, PValue> output : outputs.entrySet()) {
           @SuppressWarnings("unchecked")
           JavaPairRDD<TupleTag<?>, WindowedValue<?>> filtered =
-              all.filter(new TranslationUtils.TupleTagFilter(output.getTag()));
+              all.filter(new TranslationUtils.TupleTagFilter(output.getKey()));
           @SuppressWarnings("unchecked")
           // Object is the best we can do since different outputs can have different tags
           JavaRDD<WindowedValue<Object>> values =

http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
index d4c6c9d..65892d2 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
@@ -77,7 +77,7 @@ import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TimestampedValue;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.spark.Accumulator;
@@ -191,19 +191,19 @@ public final class StreamingTransformTranslator {
       @SuppressWarnings("unchecked")
       @Override
       public void evaluate(Flatten.PCollections<T> transform, EvaluationContext context) {
-        List<TaggedPValue> pcs = context.getInputs(transform);
+        Map<TupleTag<?>, PValue> pcs = context.getInputs(transform);
         // since this is a streaming pipeline, at least one of the PCollections to "flatten" are
         // unbounded, meaning it represents a DStream.
         // So we could end up with an unbounded unified DStream.
         final List<JavaDStream<WindowedValue<T>>> dStreams = new ArrayList<>();
         final List<Integer> streamingSources = new ArrayList<>();
-        for (TaggedPValue pv : pcs) {
+        for (PValue pv : pcs.values()) {
           checkArgument(
-              pv.getValue() instanceof PCollection,
+              pv instanceof PCollection,
               "Flatten had non-PCollection value in input: %s of type %s",
-              pv.getValue(),
-              pv.getValue().getClass().getSimpleName());
-          PCollection<T> pcol = (PCollection<T>) pv.getValue();
+              pv,
+              pv.getClass().getSimpleName());
+          PCollection<T> pcol = (PCollection<T>) pv;
           Dataset dataset = context.borrowDataset(pcol);
           if (dataset instanceof UnboundedDataset) {
             UnboundedDataset<T> unboundedDataset = (UnboundedDataset<T>) dataset;
@@ -416,15 +416,15 @@ public final class StreamingTransformTranslator {
                               windowingStrategy));
                     }
                   });
-          List<TaggedPValue> outputs = context.getOutputs(transform);
+          Map<TupleTag<?>, PValue> outputs = context.getOutputs(transform);
           if (outputs.size() > 1) {
             // cache the DStream if we're going to filter it more than once.
             all.cache();
           }
-          for (TaggedPValue output : outputs) {
+          for (Map.Entry<TupleTag<?>, PValue> output : outputs.entrySet()) {
             @SuppressWarnings("unchecked")
             JavaPairDStream<TupleTag<?>, WindowedValue<?>> filtered =
-                all.filter(new TranslationUtils.TupleTagFilter(output.getTag()));
+                all.filter(new TranslationUtils.TupleTagFilter(output.getKey()));
             @SuppressWarnings("unchecked")
             // Object is the best we can do since different outputs can have different tags
             JavaDStream<WindowedValue<Object>> values =

http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformOverrideFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformOverrideFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformOverrideFactory.java
index e2b6009..57cba50 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformOverrideFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformOverrideFactory.java
@@ -20,7 +20,6 @@
 package org.apache.beam.sdk.runners;
 
 import com.google.auto.value.AutoValue;
-import java.util.List;
 import java.util.Map;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.annotations.Experimental;
@@ -30,6 +29,7 @@ import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.beam.sdk.values.TupleTag;
 
 /**
  * Produces {@link PipelineRunner}-specific overrides of {@link PTransform PTransforms}, and
@@ -48,17 +48,15 @@ public interface PTransformOverrideFactory<
   /**
    * Returns the composite type that replacement transforms consumed from an equivalent expansion.
    */
-  InputT getInput(List<TaggedPValue> inputs, Pipeline p);
+  InputT getInput(Map<TupleTag<?>, PValue> inputs, Pipeline p);
 
   /**
    * Returns a {@link Map} from the expanded values in {@code newOutput} to the values produced by
    * the original transform.
    */
-  Map<PValue, ReplacementOutput> mapOutputs(List<TaggedPValue> outputs, OutputT newOutput);
+  Map<PValue, ReplacementOutput> mapOutputs(Map<TupleTag<?>, PValue> outputs, OutputT newOutput);
 
-  /**
-   * A mapping between original {@link TaggedPValue} outputs and their replacements.
-   */
+  /** A mapping between original {@link TaggedPValue} outputs and their replacements. */
   @AutoValue
   abstract class ReplacementOutput {
     public static ReplacementOutput of(TaggedPValue original, TaggedPValue replacement) {

http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
index 972cb5b..18bf2e9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
@@ -23,6 +23,7 @@ import static com.google.common.base.Preconditions.checkState;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableMap;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -41,7 +42,7 @@ import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.beam.sdk.values.TupleTag;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -103,8 +104,8 @@ public class TransformHierarchy {
         "Replacing a node when the graph has an unexpanded input. This is an SDK bug.");
     Node replacement =
         new Node(existing.getEnclosingNode(), transform, existing.getFullName(), input);
-    for (TaggedPValue output : existing.getOutputs()) {
-      Node producer = producers.get(output.getValue());
+    for (PValue output : existing.getOutputs().values()) {
+      Node producer = producers.get(output);
       boolean producedInExisting = false;
       do {
         if (producer.equals(existing)) {
@@ -114,13 +115,13 @@ public class TransformHierarchy {
         }
       } while (!producedInExisting && !producer.isRootNode());
       if (producedInExisting) {
-        producers.remove(output.getValue());
+        producers.remove(output);
         LOG.debug("Removed producer for value {} as it is part of a replaced composite {}",
-            output.getValue(),
+            output,
             existing.getFullName());
       } else {
         LOG.debug(
-            "Value {} not produced in existing node {}", output.getValue(), existing.getFullName());
+            "Value {} not produced in existing node {}", output, existing.getFullName());
       }
     }
     existing.getEnclosingNode().replaceChild(existing, replacement);
@@ -137,18 +138,18 @@ public class TransformHierarchy {
    */
   public void finishSpecifyingInput() {
     // Inputs must be completely specified before they are consumed by a transform.
-    for (TaggedPValue inputValue : current.getInputs()) {
-      Node producerNode = getProducer(inputValue.getValue());
-      PInput input = producerInput.remove(inputValue.getValue());
-      inputValue.getValue().finishSpecifying(input, producerNode.getTransform());
+    for (PValue inputValue : current.getInputs().values()) {
+      Node producerNode = getProducer(inputValue);
+      PInput input = producerInput.remove(inputValue);
+      inputValue.finishSpecifying(input, producerNode.getTransform());
       checkState(
-          producers.get(inputValue.getValue()) != null,
+          producers.get(inputValue) != null,
           "Producer unknown for input %s",
           inputValue);
       checkState(
-          producers.get(inputValue.getValue()) != null,
+          producers.get(inputValue) != null,
           "Producer unknown for input %s",
-          inputValue.getValue());
+          inputValue);
     }
   }
 
@@ -163,12 +164,12 @@ public class TransformHierarchy {
    * nodes.
    */
   public void setOutput(POutput output) {
-    for (TaggedPValue value : output.expand()) {
-      if (!producers.containsKey(value.getValue())) {
-        producers.put(value.getValue(), current);
+    for (PValue value : output.expand().values()) {
+      if (!producers.containsKey(value)) {
+        producers.put(value, current);
       }
-      value.getValue().finishSpecifyingOutput(unexpandedInputs.get(current), current.transform);
-      producerInput.put(value.getValue(), unexpandedInputs.get(current));
+      value.finishSpecifyingOutput(unexpandedInputs.get(current), current.transform);
+      producerInput.put(value, unexpandedInputs.get(current));
     }
     output.finishSpecifyingOutput(unexpandedInputs.get(current), current.transform);
     current.setOutput(output);
@@ -241,11 +242,11 @@ public class TransformHierarchy {
     private final List<Node> parts = new ArrayList<>();
 
     // Input to the transform, in expanded form.
-    private final List<TaggedPValue> inputs;
+    private final Map<TupleTag<?>, PValue> inputs;
 
     // TODO: track which outputs need to be exported to parent.
     // Output of the transform, in expanded form.
-    private List<TaggedPValue> outputs;
+    private Map<TupleTag<?>, PValue> outputs;
 
     @VisibleForTesting
     boolean finishedSpecifying = false;
@@ -269,7 +270,7 @@ public class TransformHierarchy {
       this.enclosingNode = enclosingNode;
       this.transform = transform;
       this.fullName = fullName;
-      this.inputs = input == null ? Collections.<TaggedPValue>emptyList() : input.expand();
+      this.inputs = input == null ? Collections.<TupleTag<?>, PValue>emptyMap() : input.expand();
     }
 
     /**
@@ -333,8 +334,8 @@ public class TransformHierarchy {
     private boolean returnsOthersOutput() {
       PTransform<?, ?> transform = getTransform();
       if (outputs != null) {
-        for (TaggedPValue outputValue : outputs) {
-          if (!getProducer(outputValue.getValue()).getTransform().equals(transform)) {
+        for (PValue outputValue : outputs.values()) {
+          if (!getProducer(outputValue).getTransform().equals(transform)) {
             return true;
           }
         }
@@ -351,8 +352,8 @@ public class TransformHierarchy {
     }
 
     /** Returns the transform input, in unexpanded form. */
-    public List<TaggedPValue> getInputs() {
-      return inputs == null ? Collections.<TaggedPValue>emptyList() : inputs;
+    public Map<TupleTag<?>, PValue> getInputs() {
+      return inputs == null ? Collections.<TupleTag<?>, PValue>emptyMap() : inputs;
     }
 
     /**
@@ -368,8 +369,8 @@ public class TransformHierarchy {
       // Validate that a primitive transform produces only primitive output, and a composite
       // transform does not produce primitive output.
       Set<Node> outputProducers = new HashSet<>();
-      for (TaggedPValue outputValue : output.expand()) {
-        outputProducers.add(getProducer(outputValue.getValue()));
+      for (PValue outputValue : output.expand().values()) {
+        outputProducers.add(getProducer(outputValue));
       }
       if (outputProducers.contains(this)) {
         if (!parts.isEmpty() || outputProducers.size() > 1) {
@@ -412,8 +413,8 @@ public class TransformHierarchy {
         // Replace the outputs of the component nodes
         component.replaceOutputs(originalToReplacement);
       }
-      List<TaggedPValue> newOutputs = new ArrayList<>(outputs.size());
-      for (TaggedPValue output : outputs) {
+      ImmutableMap.Builder<TupleTag<?>, PValue> newOutputsBuilder = ImmutableMap.builder();
+      for (Map.Entry<TupleTag<?>, PValue> output : outputs.entrySet()) {
         ReplacementOutput mapping = originalToReplacement.get(output.getValue());
         if (mapping != null) {
           if (this.equals(producers.get(mapping.getReplacement().getValue()))) {
@@ -429,11 +430,12 @@ public class TransformHierarchy {
               "Replacing output {} with original {}",
               mapping.getReplacement(),
               mapping.getOriginal());
-          newOutputs.add(TaggedPValue.of(output.getTag(), mapping.getOriginal().getValue()));
+          newOutputsBuilder.put(output.getKey(), mapping.getOriginal().getValue());
         } else {
-          newOutputs.add(output);
+          newOutputsBuilder.put(output);
         }
       }
+      ImmutableMap<TupleTag<?>, PValue> newOutputs = newOutputsBuilder.build();
       checkState(
           outputs.size() == newOutputs.size(),
           "Number of outputs must be stable across replacement");
@@ -441,8 +443,8 @@ public class TransformHierarchy {
     }
 
     /** Returns the transform output, in expanded form. */
-    public List<TaggedPValue> getOutputs() {
-      return outputs == null ? Collections.<TaggedPValue>emptyList() : outputs;
+    public Map<TupleTag<?>, PValue> getOutputs() {
+      return outputs == null ? Collections.<TupleTag<?>, PValue>emptyMap() : outputs;
     }
 
     /**
@@ -466,9 +468,9 @@ public class TransformHierarchy {
 
       if (!isRootNode()) {
         // Visit inputs.
-        for (TaggedPValue inputValue : inputs) {
-          if (visitedValues.add(inputValue.getValue())) {
-            visitor.visitValue(inputValue.getValue(), getProducer(inputValue.getValue()));
+        for (PValue inputValue : inputs.values()) {
+          if (visitedValues.add(inputValue)) {
+            visitor.visitValue(inputValue, getProducer(inputValue));
           }
         }
       }
@@ -489,9 +491,9 @@ public class TransformHierarchy {
       if (!isRootNode()) {
         checkNotNull(outputs, "Outputs for non-root node %s are null", getFullName());
         // Visit outputs.
-        for (TaggedPValue pValue : outputs) {
-          if (visitedValues.add(pValue.getValue())) {
-            visitor.visitValue(pValue.getValue(), this);
+        for (PValue pValue : outputs.values()) {
+          if (visitedValues.add(pValue)) {
+            visitor.visitValue(pValue, this);
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AppliedPTransform.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AppliedPTransform.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AppliedPTransform.java
index e78d795..8d99a62 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AppliedPTransform.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AppliedPTransform.java
@@ -18,11 +18,12 @@
 package org.apache.beam.sdk.transforms;
 
 import com.google.auto.value.AutoValue;
-import java.util.List;
+import java.util.Map;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
-import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
 
 /**
  * Represents the application of a {@link PTransform} to a specific input to produce
@@ -41,12 +42,14 @@ public abstract class AppliedPTransform<
   // To prevent extension outside of this package.
   AppliedPTransform() {}
 
-  public static <InputT extends PInput, OutputT extends POutput,
+  public static <
+          InputT extends PInput,
+          OutputT extends POutput,
           TransformT extends PTransform<? super InputT, OutputT>>
       AppliedPTransform<InputT, OutputT, TransformT> of(
           String fullName,
-          List<TaggedPValue> input,
-          List<TaggedPValue> output,
+          Map<TupleTag<?>, PValue> input,
+          Map<TupleTag<?>, PValue> output,
           TransformT transform,
           Pipeline p) {
     return new AutoValue_AppliedPTransform<InputT, OutputT, TransformT>(
@@ -55,9 +58,9 @@ public abstract class AppliedPTransform<
 
   public abstract String getFullName();
 
-  public abstract List<TaggedPValue> getInputs();
+  public abstract Map<TupleTag<?>, PValue> getInputs();
 
-  public abstract List<TaggedPValue> getOutputs();
+  public abstract Map<TupleTag<?>, PValue> getOutputs();
 
   public abstract TransformT getTransform();
 

http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/KeyedPCollectionTuple.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/KeyedPCollectionTuple.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/KeyedPCollectionTuple.java
index b373909..2e7dd01 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/KeyedPCollectionTuple.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/KeyedPCollectionTuple.java
@@ -17,8 +17,10 @@
  */
 package org.apache.beam.sdk.transforms.join;
 
+import com.google.common.collect.ImmutableMap;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
@@ -27,7 +29,7 @@ import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
-import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
 
@@ -120,12 +122,12 @@ public class KeyedPCollectionTuple<K> implements PInput {
    * any tag-specific information.
    */
   @Override
-  public List<TaggedPValue> expand() {
-    List<TaggedPValue> retval = new ArrayList<>();
+  public Map<TupleTag<?>, PValue> expand() {
+    ImmutableMap.Builder<TupleTag<?>, PValue> retval = ImmutableMap.builder();
     for (TaggedKeyedPCollection<K, ?> taggedPCollection : keyedCollections) {
-      retval.add(TaggedPValue.of(taggedPCollection.tupleTag, taggedPCollection.pCollection));
+      retval.put(taggedPCollection.tupleTag, taggedPCollection.pCollection);
     }
-    return retval;
+    return retval.build();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PBegin.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PBegin.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PBegin.java
index 2ba0f1c..04d1bdb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PBegin.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PBegin.java
@@ -18,7 +18,7 @@
 package org.apache.beam.sdk.values;
 
 import java.util.Collections;
-import java.util.List;
+import java.util.Map;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.TextIO.Read;
 import org.apache.beam.sdk.transforms.Create;
@@ -64,9 +64,9 @@ public class PBegin implements PInput {
   }
 
   @Override
-  public List<TaggedPValue> expand() {
+  public Map<TupleTag<?>, PValue> expand() {
     // A PBegin contains no PValues.
-    return Collections.emptyList();
+    return Collections.emptyMap();
   }
 
   /////////////////////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionList.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionList.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionList.java
index dcb64a8..7b45deb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionList.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionList.java
@@ -18,9 +18,10 @@
 package org.apache.beam.sdk.values;
 
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
+import com.google.common.collect.ImmutableMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
@@ -116,7 +117,7 @@ public class PCollectionList<T> implements PInput, POutput {
     return new PCollectionList<>(pipeline,
         ImmutableList.<TaggedPValue>builder()
             .addAll(pcollections)
-            .add(Iterables.getOnlyElement(pc.expand()))
+            .add(TaggedPValue.of(new TupleTag<T>(), pc))
             .build());
   }
 
@@ -133,10 +134,9 @@ public class PCollectionList<T> implements PInput, POutput {
     builder.addAll(pcollections);
     for (PCollection<T> pc : pcs) {
       if (pc.getPipeline() != pipeline) {
-        throw new IllegalArgumentException(
-            "PCollections come from different Pipelines");
+        throw new IllegalArgumentException("PCollections come from different Pipelines");
       }
-      builder.add(Iterables.getOnlyElement(pc.expand()));
+      builder.add(TaggedPValue.of(new TupleTag<T>(), pc));
     }
     return new PCollectionList<>(pipeline, builder.build());
   }
@@ -200,7 +200,10 @@ public class PCollectionList<T> implements PInput, POutput {
   // Internal details below here.
 
   final Pipeline pipeline;
-  // ImmutableMap has a defined iteration order.
+  /**
+   * The {@link PCollection PCollections} contained by this {@link PCollectionList}, and an
+   * arbitrary tags associated with each.
+   */
   final List<TaggedPValue> pcollections;
 
   PCollectionList(Pipeline pipeline) {
@@ -218,8 +221,12 @@ public class PCollectionList<T> implements PInput, POutput {
   }
 
   @Override
-  public List<TaggedPValue> expand() {
-    return pcollections;
+  public Map<TupleTag<?>, PValue> expand() {
+    ImmutableMap.Builder<TupleTag<?>, PValue> expanded = ImmutableMap.builder();
+    for (TaggedPValue tagged : pcollections) {
+      expanded.put(tagged.getTag(), tagged.getValue());
+    }
+    return expanded.build();
   }
 
   @Override
@@ -244,11 +251,11 @@ public class PCollectionList<T> implements PInput, POutput {
       return false;
     }
     PCollectionList that = (PCollectionList) other;
-    return this.pipeline.equals(that.pipeline) && this.pcollections.equals(that.pcollections);
+    return this.pipeline.equals(that.pipeline) && this.getAll().equals(that.getAll());
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(this.pipeline, this.pcollections);
+    return Objects.hash(this.pipeline, this.getAll());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionTuple.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionTuple.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionTuple.java
index d61db51..0ab26ca 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionTuple.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionTuple.java
@@ -17,11 +17,9 @@
  */
 package org.apache.beam.sdk.values;
 
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import java.util.Collections;
 import java.util.LinkedHashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import org.apache.beam.sdk.Pipeline;
@@ -234,12 +232,8 @@ public class PCollectionTuple implements PInput, POutput {
   }
 
   @Override
-  public List<TaggedPValue> expand() {
-    ImmutableList.Builder<TaggedPValue> values = ImmutableList.builder();
-    for (Map.Entry<TupleTag<?>, PCollection<?>> entry : pcollectionMap.entrySet()) {
-      values.add(TaggedPValue.of(entry.getKey(), entry.getValue()));
-    }
-    return values.build();
+  public Map<TupleTag<?>, PValue> expand() {
+    return ImmutableMap.<TupleTag<?>, PValue>copyOf(pcollectionMap);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PDone.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PDone.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PDone.java
index b4a3025..eb5db20 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PDone.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PDone.java
@@ -18,7 +18,7 @@
 package org.apache.beam.sdk.values;
 
 import java.util.Collections;
-import java.util.List;
+import java.util.Map;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.transforms.PTransform;
 
@@ -36,9 +36,9 @@ public class PDone extends POutputValueBase {
   }
 
   @Override
-  public List<TaggedPValue> expand() {
+  public Map<TupleTag<?>, PValue> expand() {
     // A PDone contains no PValues.
-    return Collections.emptyList();
+    return Collections.emptyMap();
   }
 
   private PDone(Pipeline pipeline) {

http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PInput.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PInput.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PInput.java
index 30d4297..caf7812 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PInput.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PInput.java
@@ -17,7 +17,7 @@
  */
 package org.apache.beam.sdk.values;
 
-import java.util.List;
+import java.util.Map;
 import org.apache.beam.sdk.Pipeline;
 
 /**
@@ -43,5 +43,5 @@ public interface PInput {
    *
    * <p>Not intended to be invoked directly by user code.
    */
-  List<TaggedPValue> expand();
+  Map<TupleTag<?>, PValue> expand();
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/sdks/java/core/src/main/java/org/apache/beam/sdk/values/POutput.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/POutput.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/POutput.java
index 062f565..bb01beb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/POutput.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/POutput.java
@@ -17,7 +17,7 @@
  */
 package org.apache.beam.sdk.values;
 
-import java.util.List;
+import java.util.Map;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -45,7 +45,7 @@ public interface POutput {
    *
    * <p>Not intended to be invoked directly by user code.
    */
-  List<TaggedPValue> expand();
+  Map<TupleTag<?>, PValue> expand();
 
   /**
    * Records that this {@code POutput} is an output of the given

http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValue.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValue.java
index 4c62972..06546aa 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValue.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValue.java
@@ -17,7 +17,7 @@
  */
 package org.apache.beam.sdk.values;
 
-import java.util.List;
+import java.util.Map;
 import org.apache.beam.sdk.transforms.PTransform;
 
 /**
@@ -37,7 +37,7 @@ public interface PValue extends POutput, PInput {
    * never appropriate.
    */
   @Deprecated
-  List<TaggedPValue> expand();
+  Map<TupleTag<?>, PValue> expand();
 
   /**
    * After building, finalizes this {@code PValue} to make it ready for being used as an input to a

http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java
index 8778597..91ee392 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java
@@ -18,7 +18,7 @@
 package org.apache.beam.sdk.values;
 
 import java.util.Collections;
-import java.util.List;
+import java.util.Map;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -133,8 +133,8 @@ public abstract class PValueBase extends POutputValueBase implements PValue {
   }
 
   @Override
-  public final List<TaggedPValue> expand() {
-    return Collections.singletonList(TaggedPValue.of(tag, this));
+  public final Map<TupleTag<?>, PValue> expand() {
+    return Collections.<TupleTag<?>, PValue>singletonMap(tag, this);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TaggedPValue.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TaggedPValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TaggedPValue.java
index 458d16f..3b4d599 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TaggedPValue.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TaggedPValue.java
@@ -20,6 +20,7 @@
 package org.apache.beam.sdk.values;
 
 import com.google.auto.value.AutoValue;
+import com.google.common.collect.Iterables;
 
 /**
  * A (TupleTag, PValue) pair used in the expansion of a {@link PInput} or {@link POutput}.
@@ -30,6 +31,10 @@ public abstract class TaggedPValue {
     return new AutoValue_TaggedPValue(tag, value);
   }
 
+  public static TaggedPValue ofExpandedValue(PValue value) {
+    return of(Iterables.getOnlyElement(value.expand().keySet()), value);
+  }
+
   /**
    * Returns the local tag associated with the {@link PValue}.
    */

http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
index efe8db4..0a5746b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
@@ -393,17 +393,21 @@ public class PipelineTest {
     }
 
     @Override
-    public PBegin getInput(List<TaggedPValue> inputs, Pipeline p) {
+    public PBegin getInput(Map<TupleTag<?>, PValue> inputs, Pipeline p) {
       return p.begin();
     }
 
     @Override
     public Map<PValue, ReplacementOutput> mapOutputs(
-        List<TaggedPValue> outputs, PCollection<Long> newOutput) {
+        Map<TupleTag<?>, PValue> outputs, PCollection<Long> newOutput) {
+      Map.Entry<TupleTag<?>, PValue> original = Iterables.getOnlyElement(outputs.entrySet());
+      Map.Entry<TupleTag<?>, PValue> replacement =
+          Iterables.getOnlyElement(newOutput.expand().entrySet());
       return Collections.<PValue, ReplacementOutput>singletonMap(
           newOutput,
           ReplacementOutput.of(
-              Iterables.getOnlyElement(outputs), Iterables.getOnlyElement(newOutput.expand())));
+              TaggedPValue.of(original.getKey(), original.getValue()),
+              TaggedPValue.of(replacement.getKey(), replacement.getValue())));
     }
   }
   static class UnboundedCountingInputOverride
@@ -415,17 +419,21 @@ public class PipelineTest {
     }
 
     @Override
-    public PBegin getInput(List<TaggedPValue> inputs, Pipeline p) {
+    public PBegin getInput(Map<TupleTag<?>, PValue> inputs, Pipeline p) {
       return p.begin();
     }
 
     @Override
     public Map<PValue, ReplacementOutput> mapOutputs(
-        List<TaggedPValue> outputs, PCollection<Long> newOutput) {
+        Map<TupleTag<?>, PValue> outputs, PCollection<Long> newOutput) {
+      Map.Entry<TupleTag<?>, PValue> original = Iterables.getOnlyElement(outputs.entrySet());
+      Map.Entry<TupleTag<?>, PValue> replacement =
+          Iterables.getOnlyElement(newOutput.expand().entrySet());
       return Collections.<PValue, ReplacementOutput>singletonMap(
           newOutput,
           ReplacementOutput.of(
-              Iterables.getOnlyElement(outputs), Iterables.getOnlyElement(newOutput.expand())));
+              TaggedPValue.of(original.getKey(), original.getValue()),
+              TaggedPValue.of(replacement.getKey(), replacement.getValue())));
     }
   }
 }


[2/4] beam git commit: Remove Orderdness of Input, Output expansions

Posted by tg...@apache.org.
http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
index f62b320..3638fc8 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
@@ -17,21 +17,18 @@
  */
 package org.apache.beam.sdk.runners;
 
-import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertThat;
 
-import com.google.common.base.Function;
 import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
 import java.io.Serializable;
 import java.util.Collections;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import org.apache.beam.sdk.Pipeline.PipelineVisitor;
 import org.apache.beam.sdk.Pipeline.PipelineVisitor.Defaults;
@@ -224,11 +221,13 @@ public class TransformHierarchyTest implements Serializable {
     assertThat(hierarchy.getCurrent(), equalTo(replacement));
     hierarchy.setOutput(replacementOutput);
 
-    TaggedPValue taggedOriginal = Iterables.getOnlyElement(originalOutput.expand());
-    TaggedPValue taggedReplacement = Iterables.getOnlyElement(replacementOutput.expand());
+    TaggedPValue taggedReplacement = TaggedPValue.ofExpandedValue(replacementOutput);
     Map<PValue, ReplacementOutput> replacementOutputs =
         Collections.<PValue, ReplacementOutput>singletonMap(
-            replacementOutput, ReplacementOutput.of(taggedOriginal, taggedReplacement));
+            replacementOutput,
+            ReplacementOutput.of(
+                TaggedPValue.ofExpandedValue(originalOutput),
+                taggedReplacement));
     hierarchy.replaceOutputs(replacementOutputs);
 
     assertThat(replacement.getInputs(), equalTo(original.getInputs()));
@@ -238,8 +237,9 @@ public class TransformHierarchyTest implements Serializable {
         replacement.getTransform(), Matchers.<PTransform<?, ?>>equalTo(replacementTransform));
     // THe tags of the replacement transform are matched to the appropriate PValues of the original
     assertThat(
-        replacement.getOutputs(),
-        contains(TaggedPValue.of(taggedReplacement.getTag(), taggedOriginal.getValue())));
+        replacement.getOutputs().keySet(),
+        Matchers.<TupleTag<?>>contains(taggedReplacement.getTag()));
+    assertThat(replacement.getOutputs().values(), Matchers.<PValue>contains(originalOutput));
     hierarchy.popNode();
   }
 
@@ -294,21 +294,23 @@ public class TransformHierarchyTest implements Serializable {
     hierarchy.popNode();
     hierarchy.setOutput(replacementOutput.get(longs));
 
-    TaggedPValue originalLongs = Iterables.getOnlyElement(output.expand());
-    TaggedPValue replacementLongs = Iterables.getOnlyElement(replacementOutput.expand());
+    Entry<TupleTag<?>, PValue>
+        replacementLongs = Iterables.getOnlyElement(replacementOutput.expand().entrySet());
     hierarchy.replaceOutputs(
         Collections.<PValue, ReplacementOutput>singletonMap(
-            replacementOutput.get(longs), ReplacementOutput.of(originalLongs, replacementLongs)));
+            replacementOutput.get(longs),
+            ReplacementOutput.of(
+                TaggedPValue.ofExpandedValue(output),
+                TaggedPValue.of(replacementLongs.getKey(), replacementLongs.getValue()))));
 
     assertThat(
-        replacementParNode.getOutputs(),
-        contains(TaggedPValue.of(replacementLongs.getTag(), originalLongs.getValue())));
+        replacementParNode.getOutputs().keySet(),
+        Matchers.<TupleTag<?>>contains(replacementLongs.getKey()));
+    assertThat(replacementParNode.getOutputs().values(), Matchers.<PValue>contains(output));
     assertThat(
-        compositeNode.getOutputs(),
-        contains(
-            TaggedPValue.of(
-                Iterables.getOnlyElement(replacementOutput.get(longs).expand()).getTag(),
-                originalLongs.getValue())));
+        compositeNode.getOutputs().keySet(),
+        equalTo(replacementOutput.get(longs).expand().keySet()));
+    assertThat(compositeNode.getOutputs().values(), Matchers.<PValue>contains(output));
     hierarchy.popNode();
   }
 
@@ -340,10 +342,10 @@ public class TransformHierarchyTest implements Serializable {
     TransformHierarchy.Node compositeNode = hierarchy.pushNode("Create", begin, create);
     hierarchy.finishSpecifyingInput();
     assertThat(hierarchy.getCurrent(), equalTo(compositeNode));
-    assertThat(compositeNode.getInputs(), Matchers.emptyIterable());
+    assertThat(compositeNode.getInputs().entrySet(), Matchers.empty());
     assertThat(compositeNode.getTransform(), Matchers.<PTransform<?, ?>>equalTo(create));
     // Not yet set
-    assertThat(compositeNode.getOutputs(), Matchers.emptyIterable());
+    assertThat(compositeNode.getOutputs().entrySet(), Matchers.emptyIterable());
     assertThat(compositeNode.getEnclosingNode().isRootNode(), is(true));
 
     TransformHierarchy.Node primitiveNode = hierarchy.pushNode("Create/Read", begin, read);
@@ -351,16 +353,14 @@ public class TransformHierarchyTest implements Serializable {
     hierarchy.finishSpecifyingInput();
     hierarchy.setOutput(created);
     hierarchy.popNode();
-    assertThat(
-        fromTaggedValues(primitiveNode.getOutputs()), Matchers.<PValue>containsInAnyOrder(created));
-    assertThat(primitiveNode.getInputs(), Matchers.<TaggedPValue>emptyIterable());
+    assertThat(primitiveNode.getOutputs().values(), Matchers.<PValue>containsInAnyOrder(created));
+    assertThat(primitiveNode.getInputs().entrySet(), Matchers.emptyIterable());
     assertThat(primitiveNode.getTransform(), Matchers.<PTransform<?, ?>>equalTo(read));
     assertThat(primitiveNode.getEnclosingNode(), equalTo(compositeNode));
 
     hierarchy.setOutput(created);
     // The composite is listed as outputting a PValue created by the contained primitive
-    assertThat(
-        fromTaggedValues(compositeNode.getOutputs()), Matchers.<PValue>containsInAnyOrder(created));
+    assertThat(compositeNode.getOutputs().values(), Matchers.<PValue>containsInAnyOrder(created));
     // The producer of that PValue is still the primitive in which it is first output
     assertThat(hierarchy.getProducer(created), equalTo(primitiveNode));
     hierarchy.popNode();
@@ -457,11 +457,14 @@ public class TransformHierarchyTest implements Serializable {
     hierarchy.popNode();
     hierarchy.setOutput(replacementOutput.get(longs));
 
-    TaggedPValue originalLongs = Iterables.getOnlyElement(output.expand());
-    TaggedPValue replacementLongs = Iterables.getOnlyElement(replacementOutput.expand());
+    Entry<TupleTag<?>, PValue> replacementLongs =
+        Iterables.getOnlyElement(replacementOutput.expand().entrySet());
     hierarchy.replaceOutputs(
         Collections.<PValue, ReplacementOutput>singletonMap(
-            replacementOutput.get(longs), ReplacementOutput.of(originalLongs, replacementLongs)));
+            replacementOutput.get(longs),
+            ReplacementOutput.of(
+                TaggedPValue.ofExpandedValue(output),
+                TaggedPValue.of(replacementLongs.getKey(), replacementLongs.getValue()))));
     hierarchy.popNode();
 
     final Set<Node> visitedCompositeNodes = new HashSet<>();
@@ -489,15 +492,4 @@ public class TransformHierarchyTest implements Serializable {
     assertThat(visitedPrimitiveNodes, containsInAnyOrder(upstreamNode, replacementParNode));
     assertThat(visitedValues, Matchers.<PValue>containsInAnyOrder(upstream, output));
   }
-
-  private static List<PValue> fromTaggedValues(List<TaggedPValue> taggedValues) {
-    return Lists.transform(
-        taggedValues,
-        new Function<TaggedPValue, PValue>() {
-          @Override
-          public PValue apply(TaggedPValue input) {
-            return input.getValue();
-          }
-        });
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionListTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionListTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionListTest.java
index 2482f32..76cba01 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionListTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionListTest.java
@@ -19,8 +19,8 @@ package org.apache.beam.sdk.values;
 
 import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.hasSize;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 
@@ -28,9 +28,9 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.testing.EqualsTester;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.CountingInput;
-import org.apache.beam.sdk.io.CountingInput.BoundedCountingInput;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.hamcrest.Matchers;
@@ -90,27 +90,25 @@ public class PCollectionListTest {
     assertThat(
         fromEmpty.getAll(), contains(unboundedCount, createOne, boundedCount, maxRecordsCount));
 
-    List<TaggedPValue> expansion = fromEmpty.expand();
-    // TaggedPValues are stable between expansions
+    Map<TupleTag<?>, PValue> expansion = fromEmpty.expand();
+    // Tag->PValue mappings are stable between expansions. They don't need to be stable across
+    // different list instances, though
     assertThat(expansion, equalTo(fromEmpty.expand()));
-    // TaggedPValues are equivalent between equivalent lists
-    assertThat(
-        expansion,
-        equalTo(
-            PCollectionList.of(unboundedCount)
-                .and(createOne)
-                .and(boundedCount)
-                .and(maxRecordsCount)
-                .expand()));
 
     List<PCollection<Long>> expectedList =
         ImmutableList.of(unboundedCount, createOne, boundedCount, maxRecordsCount);
-    for (int i = 0; i < expansion.size(); i++) {
-      assertThat(
-          "Index " + i + " should have equal PValue",
-          expansion.get(i).getValue(),
-          Matchers.<PValue>equalTo(expectedList.get(i)));
-    }
+    assertThat(expansion.values(), containsInAnyOrder(expectedList.toArray()));
+  }
+
+  @Test
+  public void testExpandWithDuplicates() {
+    Pipeline p = TestPipeline.create();
+    PCollection<Long> createOne = p.apply("CreateOne", Create.of(1L, 2L, 3L));
+
+    PCollectionList<Long> list = PCollectionList.of(createOne).and(createOne).and(createOne);
+    assertThat(
+        list.expand().values(),
+        Matchers.<PValue>containsInAnyOrder(createOne, createOne, createOne));
   }
 
   @Test
@@ -121,15 +119,15 @@ public class PCollectionListTest {
     PCollection<String> third = p.apply("Syntactic", Create.of("eggs", "baz"));
 
     EqualsTester tester = new EqualsTester();
-    tester.addEqualityGroup(PCollectionList.empty(p), PCollectionList.empty(p));
-    tester.addEqualityGroup(PCollectionList.of(first).and(second));
+//    tester.addEqualityGroup(PCollectionList.empty(p), PCollectionList.empty(p));
+//    tester.addEqualityGroup(PCollectionList.of(first).and(second));
     // Constructors should all produce equivalent
     tester.addEqualityGroup(
         PCollectionList.of(first).and(second).and(third),
         PCollectionList.of(first).and(second).and(third),
-        PCollectionList.<String>empty(p).and(first).and(second).and(third),
-        PCollectionList.of(ImmutableList.of(first, second, third)),
-        PCollectionList.of(first).and(ImmutableList.of(second, third)),
+//        PCollectionList.<String>empty(p).and(first).and(second).and(third),
+//        PCollectionList.of(ImmutableList.of(first, second, third)),
+//        PCollectionList.of(first).and(ImmutableList.of(second, third)),
         PCollectionList.of(ImmutableList.of(first, second)).and(third));
     // Order is considered
     tester.addEqualityGroup(PCollectionList.of(first).and(third).and(second));
@@ -137,28 +135,4 @@ public class PCollectionListTest {
 
     tester.testEquals();
   }
-
-  @Test
-  public void testExpansionOrderWithDuplicates() {
-    TestPipeline p = TestPipeline.create();
-    BoundedCountingInput count = CountingInput.upTo(10L);
-    PCollection<Long> firstCount = p.apply("CountFirst", count);
-    PCollection<Long> secondCount = p.apply("CountSecond", count);
-
-    PCollectionList<Long> counts =
-        PCollectionList.of(firstCount).and(secondCount).and(firstCount).and(firstCount);
-
-    ImmutableList<PCollection<Long>> expectedOrder =
-        ImmutableList.of(firstCount, secondCount, firstCount, firstCount);
-    PCollectionList<Long> reconstructed = PCollectionList.empty(p);
-    assertThat(counts.expand(), hasSize(4));
-    for (int i = 0; i < 4; i++) {
-      PValue value = counts.expand().get(i).getValue();
-      assertThat(
-          "Index " + i + " should be equal", value,
-          Matchers.<PValue>equalTo(expectedOrder.get(i)));
-      reconstructed = reconstructed.and((PCollection<Long>) value);
-    }
-    assertThat(reconstructed, equalTo(counts));
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java
index 010d726..0a0abd6 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java
@@ -29,6 +29,7 @@ import java.io.Serializable;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.CountingInput;
 import org.apache.beam.sdk.testing.PAssert;
@@ -153,8 +154,8 @@ public final class PCollectionTupleTest implements Serializable {
         PCollectionTuple.of(intTag, ints).and(longTag, longs).and(strTag, strs);
     assertThat(tuple.getAll(), equalTo(pcsByTag));
     PCollectionTuple reconstructed = PCollectionTuple.empty(p);
-    for (TaggedPValue taggedValue : tuple.expand()) {
-      TupleTag<?> tag = taggedValue.getTag();
+    for (Entry<TupleTag<?>, PValue> taggedValue : tuple.expand().entrySet()) {
+      TupleTag<?> tag = taggedValue.getKey();
       PValue value = taggedValue.getValue();
       assertThat("The tag should map back to the value", tuple.get(tag), equalTo(value));
       assertThat(value, Matchers.<PValue>equalTo(pcsByTag.get(tag)));

http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java
index 07fbc68..3e0f51c 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java
@@ -18,12 +18,11 @@
 package org.apache.beam.sdk.io.gcp.bigquery;
 
 import java.util.Collections;
-import java.util.List;
-
+import java.util.Map;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.values.POutputValueBase;
-import org.apache.beam.sdk.values.TaggedPValue;
-
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
 
 /**
  * The result of a {@link BigQueryIO.Write} transform.
@@ -37,8 +36,8 @@ final class WriteResult extends POutputValueBase {
   }
 
   @Override
-  public List<TaggedPValue> expand() {
-    return Collections.emptyList();
+  public Map<TupleTag<?>, PValue> expand() {
+    return Collections.emptyMap();
   }
 
   private WriteResult(Pipeline pipeline) {


[4/4] beam git commit: Remove Orderdness of Input, Output expansions

Posted by tg...@apache.org.
Remove Orderdness of Input, Output expansions

This brings the PInput and POutput expansion signatures back in line
with the Runner API.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0e5737fd
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0e5737fd
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0e5737fd

Branch: refs/heads/master
Commit: 0e5737fdbee5478ee7f39c4b1a1ac95353ec7b08
Parents: a5a5bf9
Author: Thomas Groh <tg...@google.com>
Authored: Tue Apr 4 16:51:55 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Apr 11 12:40:46 2017 -0700

----------------------------------------------------------------------
 .../FlattenPCollectionTranslator.java           |  13 +-
 .../apex/translation/ParDoTranslator.java       |  13 +-
 .../apex/translation/TranslationContext.java    |  10 +-
 .../DeduplicatedFlattenFactory.java             |  18 ++-
 .../EmptyFlattenAsCreateFactory.java            |   7 +-
 .../core/construction/PTransformMatchers.java   |   5 +-
 .../core/construction/PrimitiveCreate.java      |   7 +-
 .../core/construction/ReplacementOutputs.java   |  63 ++++-----
 .../SingleInputOutputOverrideFactory.java       |  11 +-
 .../UnsupportedOverrideFactory.java             |   8 +-
 .../DeduplicatedFlattenFactoryTest.java         |   6 +-
 .../EmptyFlattenAsCreateFactoryTest.java        |   8 +-
 .../construction/PTransformMatchersTest.java    | 132 +++++++++----------
 .../construction/ReplacementOutputsTest.java    | 109 ++-------------
 .../SingleInputOutputOverrideFactoryTest.java   |   6 +-
 .../UnsupportedOverrideFactoryTest.java         |   7 +-
 .../direct/BoundedReadEvaluatorFactory.java     |   2 +-
 .../beam/runners/direct/DirectGraphVisitor.java |   5 +-
 .../direct/ExecutorServiceParallelExecutor.java |   4 +-
 .../runners/direct/FlattenEvaluatorFactory.java |   2 +-
 .../GroupAlsoByWindowEvaluatorFactory.java      |   2 +-
 .../direct/GroupByKeyOnlyEvaluatorFactory.java  |   4 +-
 .../direct/KeyedPValueTrackingVisitor.java      |  14 +-
 .../runners/direct/ParDoEvaluatorFactory.java   |  10 +-
 .../direct/ParDoMultiOverrideFactory.java       |   9 +-
 .../direct/StatefulParDoEvaluatorFactory.java   |   8 +-
 .../direct/TestStreamEvaluatorFactory.java      |   8 +-
 .../direct/UnboundedReadEvaluatorFactory.java   |   4 +-
 .../runners/direct/ViewEvaluatorFactory.java    |   4 +-
 .../runners/direct/ViewOverrideFactory.java     |   9 +-
 .../beam/runners/direct/WatermarkManager.java   |  19 +--
 .../runners/direct/WindowEvaluatorFactory.java  |   2 +-
 .../direct/WriteWithShardingFactory.java        |  10 +-
 .../runners/direct/DirectGraphVisitorTest.java  |   7 +-
 .../beam/runners/direct/ParDoEvaluatorTest.java |   2 +-
 .../StatefulParDoEvaluatorFactoryTest.java      |   2 +-
 .../direct/TestStreamEvaluatorFactoryTest.java  |   5 +-
 .../runners/direct/ViewOverrideFactoryTest.java |   2 +-
 .../flink/FlinkBatchTransformTranslators.java   |  36 ++---
 .../flink/FlinkBatchTranslationContext.java     |  11 +-
 .../flink/FlinkStreamingPipelineTranslator.java |   9 +-
 .../FlinkStreamingTransformTranslators.java     |  32 ++---
 .../flink/FlinkStreamingTranslationContext.java |  12 +-
 .../dataflow/BatchStatefulParDoOverrides.java   |  15 +--
 .../dataflow/DataflowPipelineTranslator.java    |  20 +--
 .../beam/runners/dataflow/DataflowRunner.java   |  27 ++--
 .../runners/dataflow/TransformTranslator.java   |   6 +-
 .../dataflow/DataflowPipelineJobTest.java       |   7 +-
 .../apache/beam/runners/spark/SparkRunner.java  |  20 +--
 .../beam/runners/spark/TestSparkRunner.java     |   7 +-
 .../spark/translation/EvaluationContext.java    |  11 +-
 .../spark/translation/TransformTranslator.java  |  25 ++--
 .../streaming/StreamingTransformTranslator.java |  20 +--
 .../sdk/runners/PTransformOverrideFactory.java  |  10 +-
 .../beam/sdk/runners/TransformHierarchy.java    |  80 +++++------
 .../beam/sdk/transforms/AppliedPTransform.java  |  17 ++-
 .../transforms/join/KeyedPCollectionTuple.java  |  12 +-
 .../java/org/apache/beam/sdk/values/PBegin.java |   6 +-
 .../apache/beam/sdk/values/PCollectionList.java |  27 ++--
 .../beam/sdk/values/PCollectionTuple.java       |  10 +-
 .../java/org/apache/beam/sdk/values/PDone.java  |   6 +-
 .../java/org/apache/beam/sdk/values/PInput.java |   4 +-
 .../org/apache/beam/sdk/values/POutput.java     |   4 +-
 .../java/org/apache/beam/sdk/values/PValue.java |   4 +-
 .../org/apache/beam/sdk/values/PValueBase.java  |   6 +-
 .../apache/beam/sdk/values/TaggedPValue.java    |   5 +
 .../java/org/apache/beam/sdk/PipelineTest.java  |  20 ++-
 .../sdk/runners/TransformHierarchyTest.java     |  72 +++++-----
 .../beam/sdk/values/PCollectionListTest.java    |  70 ++++------
 .../beam/sdk/values/PCollectionTupleTest.java   |   5 +-
 .../beam/sdk/io/gcp/bigquery/WriteResult.java   |  11 +-
 71 files changed, 526 insertions(+), 658 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java
index 080c5e9..440b801 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java
@@ -32,7 +32,8 @@ import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
 
 /**
  * {@link Flatten.PCollections} translation to Apex operator.
@@ -63,15 +64,15 @@ class FlattenPCollectionTranslator<T> implements
     }
   }
 
-  private List<PCollection<T>> extractPCollections(List<TaggedPValue> inputs) {
+  private List<PCollection<T>> extractPCollections(Map<TupleTag<?>, PValue> inputs) {
     List<PCollection<T>> collections = Lists.newArrayList();
-    for (TaggedPValue pv : inputs) {
+    for (PValue pv : inputs.values()) {
       checkArgument(
-          pv.getValue() instanceof PCollection,
+          pv instanceof PCollection,
           "Non-PCollection provided as input to flatten: %s of type %s",
-          pv.getValue(),
+          pv,
           pv.getClass().getSimpleName());
-      collections.add((PCollection<T>) pv.getValue());
+      collections.add((PCollection<T>) pv);
     }
     return collections;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java
index fa9d21d..9213c1f 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java
@@ -27,6 +27,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import org.apache.beam.runners.apex.ApexRunner;
 import org.apache.beam.runners.apex.translation.operators.ApexParDoOperator;
 import org.apache.beam.sdk.coders.Coder;
@@ -38,7 +39,7 @@ import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
 import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TupleTag;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -81,8 +82,8 @@ class ParDoTranslator<InputT, OutputT>
               ApexRunner.class.getSimpleName()));
     }
 
-    List<TaggedPValue> outputs = context.getOutputs();
-    PCollection<InputT> input = context.getInput();
+    Map<TupleTag<?>, PValue> outputs = context.getOutputs();
+    PCollection<InputT> input = (PCollection<InputT>) context.getInput();
     List<PCollectionView<?>> sideInputs = transform.getSideInputs();
     Coder<InputT> inputCoder = input.getCoder();
     WindowedValueCoder<InputT> wvInputCoder =
@@ -100,7 +101,7 @@ class ParDoTranslator<InputT, OutputT>
             context.getStateBackend());
 
     Map<PCollection<?>, OutputPort<?>> ports = Maps.newHashMapWithExpectedSize(outputs.size());
-    for (TaggedPValue output : outputs) {
+    for (Entry<TupleTag<?>, PValue> output : outputs.entrySet()) {
       checkArgument(
           output.getValue() instanceof PCollection,
           "%s %s outputs non-PCollection %s of type %s",
@@ -109,12 +110,12 @@ class ParDoTranslator<InputT, OutputT>
           output.getValue(),
           output.getValue().getClass().getSimpleName());
       PCollection<?> pc = (PCollection<?>) output.getValue();
-      if (output.getTag().equals(transform.getMainOutputTag())) {
+      if (output.getKey().equals(transform.getMainOutputTag())) {
         ports.put(pc, operator.output);
       } else {
         int portIndex = 0;
         for (TupleTag<?> tag : transform.getSideOutputTags().getAll()) {
-          if (tag.equals(output.getTag())) {
+          if (tag.equals(output.getKey())) {
             ports.put(pc, operator.sideOutputPorts[portIndex]);
             break;
           }

http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java
index 81507ef..c78028e 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java
@@ -42,7 +42,7 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.beam.sdk.values.TupleTag;
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 
@@ -85,20 +85,20 @@ class TranslationContext {
     return getCurrentTransform().getFullName();
   }
 
-  public List<TaggedPValue> getInputs() {
+  public Map<TupleTag<?>, PValue> getInputs() {
     return getCurrentTransform().getInputs();
   }
 
   public <InputT extends PValue> InputT getInput() {
-    return (InputT) Iterables.getOnlyElement(getCurrentTransform().getInputs()).getValue();
+    return (InputT) Iterables.getOnlyElement(getCurrentTransform().getInputs().values());
   }
 
-  public List<TaggedPValue> getOutputs() {
+  public Map<TupleTag<?>, PValue> getOutputs() {
     return getCurrentTransform().getOutputs();
   }
 
   public <OutputT extends PValue> OutputT getOutput() {
-    return (OutputT) Iterables.getOnlyElement(getCurrentTransform().getOutputs()).getValue();
+    return (OutputT) Iterables.getOnlyElement(getCurrentTransform().getOutputs().values());
   }
 
   private AppliedPTransform<?, ?, ?> getCurrentTransform() {

http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/DeduplicatedFlattenFactory.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/DeduplicatedFlattenFactory.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/DeduplicatedFlattenFactory.java
index 093385e..c12c548 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/DeduplicatedFlattenFactory.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/DeduplicatedFlattenFactory.java
@@ -19,7 +19,6 @@
 package org.apache.beam.runners.core.construction;
 
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory;
@@ -31,7 +30,7 @@ import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
 import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.beam.sdk.values.TupleTag;
 
 /**
  * A {@link PTransformOverrideFactory} that will apply a flatten where no element appears in the
@@ -46,6 +45,7 @@ public class DeduplicatedFlattenFactory<T>
   }
 
   private DeduplicatedFlattenFactory() {}
+
   @Override
   public PTransform<PCollectionList<T>, PCollection<T>> getReplacementTransform(
       PCollections<T> transform) {
@@ -75,12 +75,16 @@ public class DeduplicatedFlattenFactory<T>
     };
   }
 
+  /**
+   * {@inheritDoc}.
+   *
+   * <p>The input {@link PCollectionList} that is constructed will have the same values in the same
+   */
   @Override
-  public PCollectionList<T> getInput(
-      List<TaggedPValue> inputs, Pipeline p) {
+  public PCollectionList<T> getInput(Map<TupleTag<?>, PValue> inputs, Pipeline p) {
     PCollectionList<T> pCollections = PCollectionList.empty(p);
-    for (TaggedPValue input : inputs) {
-      PCollection<T> pcollection = (PCollection<T>) input.getValue();
+    for (PValue input : inputs.values()) {
+      PCollection<T> pcollection = (PCollection<T>) input;
       pCollections = pCollections.and(pcollection);
     }
     return pCollections;
@@ -88,7 +92,7 @@ public class DeduplicatedFlattenFactory<T>
 
   @Override
   public Map<PValue, ReplacementOutput> mapOutputs(
-      List<TaggedPValue> outputs, PCollection<T> newOutput) {
+      Map<TupleTag<?>, PValue> outputs, PCollection<T> newOutput) {
     return ReplacementOutputs.singleton(outputs, newOutput);
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/EmptyFlattenAsCreateFactory.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/EmptyFlattenAsCreateFactory.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/EmptyFlattenAsCreateFactory.java
index 4328cf3..936bc08 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/EmptyFlattenAsCreateFactory.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/EmptyFlattenAsCreateFactory.java
@@ -20,7 +20,6 @@ package org.apache.beam.runners.core.construction;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
-import java.util.List;
 import java.util.Map;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.VoidCoder;
@@ -31,7 +30,7 @@ import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
 import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.beam.sdk.values.TupleTag;
 
 /**
  * A {@link PTransformOverrideFactory} that provides an empty {@link Create} to replace a {@link
@@ -57,7 +56,7 @@ public class EmptyFlattenAsCreateFactory<T>
 
   @Override
   public PCollectionList<T> getInput(
-      List<TaggedPValue> inputs, Pipeline p) {
+      Map<TupleTag<?>, PValue> inputs, Pipeline p) {
     checkArgument(
         inputs.isEmpty(),
         "Unexpected nonempty input %s for %s",
@@ -68,7 +67,7 @@ public class EmptyFlattenAsCreateFactory<T>
 
   @Override
   public Map<PValue, ReplacementOutput> mapOutputs(
-      List<TaggedPValue> outputs, PCollection<T> newOutput) {
+      Map<TupleTag<?>, PValue> outputs, PCollection<T> newOutput) {
     return ReplacementOutputs.singleton(outputs, newOutput);
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
index 6437f7e..94ec38c 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
@@ -37,7 +37,6 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TaggedPValue;
 
 /**
  * A {@link PTransformMatcher} that matches {@link PTransform PTransforms} based on the class of the
@@ -250,8 +249,8 @@ public class PTransformMatchers {
       public boolean matches(AppliedPTransform<?, ?, ?> application) {
         if (application.getTransform() instanceof Flatten.PCollections) {
           Set<PValue> observed = new HashSet<>();
-          for (TaggedPValue pvalue : application.getInputs()) {
-            boolean firstInstance = observed.add(pvalue.getValue());
+          for (PValue pvalue : application.getInputs().values()) {
+            boolean firstInstance = observed.add(pvalue);
             if (!firstInstance) {
               return true;
             }

http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PrimitiveCreate.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PrimitiveCreate.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PrimitiveCreate.java
index 7bd38b0..9335f3a 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PrimitiveCreate.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PrimitiveCreate.java
@@ -18,7 +18,6 @@
 
 package org.apache.beam.runners.core.construction;
 
-import java.util.List;
 import java.util.Map;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory;
@@ -30,7 +29,7 @@ import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollection.IsBounded;
 import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.beam.sdk.values.TupleTag;
 
 /**
  * An implementation of {@link Create} that returns a primitive {@link PCollection}.
@@ -63,13 +62,13 @@ public class PrimitiveCreate<T> extends PTransform<PBegin, PCollection<T>> {
     }
 
     @Override
-    public PBegin getInput(List<TaggedPValue> inputs, Pipeline p) {
+    public PBegin getInput(Map<TupleTag<?>, PValue> inputs, Pipeline p) {
       return p.begin();
     }
 
     @Override
     public Map<PValue, ReplacementOutput> mapOutputs(
-        List<TaggedPValue> outputs, PCollection<T> newOutput) {
+        Map<TupleTag<?>, PValue> outputs, PCollection<T> newOutput) {
       return ReplacementOutputs.singleton(outputs, newOutput);
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReplacementOutputs.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReplacementOutputs.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReplacementOutputs.java
index 11b4449..3d485ae 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReplacementOutputs.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReplacementOutputs.java
@@ -21,10 +21,11 @@ import static com.google.common.base.Preconditions.checkArgument;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory.ReplacementOutput;
 import org.apache.beam.sdk.values.POutput;
@@ -39,60 +40,40 @@ public class ReplacementOutputs {
   private ReplacementOutputs() {}
 
   public static Map<PValue, ReplacementOutput> singleton(
-      List<TaggedPValue> original, PValue replacement) {
-    TaggedPValue taggedReplacement = Iterables.getOnlyElement(replacement.expand());
-    return ImmutableMap.<PValue, ReplacementOutput>builder()
-        .put(
-            taggedReplacement.getValue(),
-            ReplacementOutput.of(Iterables.getOnlyElement(original), taggedReplacement))
-        .build();
-  }
-
-  public static Map<PValue, ReplacementOutput> ordered(
-      List<TaggedPValue> original, POutput replacement) {
-    ImmutableMap.Builder<PValue, ReplacementOutput> result = ImmutableMap.builder();
-    List<TaggedPValue> replacements = replacement.expand();
-    checkArgument(
-        original.size() == replacements.size(),
-        "Original and Replacements must be the same size. Original: %s Replacement: %s",
-        original.size(),
-        replacements.size());
-    int i = 0;
-    for (TaggedPValue replacementPvalue : replacements) {
-      result.put(
-          replacementPvalue.getValue(), ReplacementOutput.of(original.get(i), replacementPvalue));
-      i++;
-    }
-    return result.build();
+      Map<TupleTag<?>, PValue> original, PValue replacement) {
+    Entry<TupleTag<?>, PValue> originalElement = Iterables.getOnlyElement(original.entrySet());
+    TupleTag<?> replacementTag = Iterables.getOnlyElement(replacement.expand().entrySet()).getKey();
+    return Collections.singletonMap(
+        replacement,
+        ReplacementOutput.of(
+            TaggedPValue.of(originalElement.getKey(), originalElement.getValue()),
+            TaggedPValue.of(replacementTag, replacement)));
   }
 
   public static Map<PValue, ReplacementOutput> tagged(
-      List<TaggedPValue> original, POutput replacement) {
+      Map<TupleTag<?>, PValue> original, POutput replacement) {
     Map<TupleTag<?>, TaggedPValue> originalTags = new HashMap<>();
-    for (TaggedPValue value : original) {
-      TaggedPValue former = originalTags.put(value.getTag(), value);
-      checkArgument(
-          former == null || former.equals(value),
-          "Found two tags in an expanded output which map to different values: output: %s "
-              + "Values: %s and %s",
-          original,
-          former,
-          value);
+    for (Map.Entry<TupleTag<?>, PValue> originalValue : original.entrySet()) {
+      originalTags.put(
+          originalValue.getKey(),
+          TaggedPValue.of(originalValue.getKey(), originalValue.getValue()));
     }
     ImmutableMap.Builder<PValue, ReplacementOutput> resultBuilder = ImmutableMap.builder();
     Set<TupleTag<?>> missingTags = new HashSet<>(originalTags.keySet());
-    for (TaggedPValue replacementValue : replacement.expand()) {
-      TaggedPValue mapped = originalTags.get(replacementValue.getTag());
+    for (Map.Entry<TupleTag<?>, PValue> replacementValue : replacement.expand().entrySet()) {
+      TaggedPValue mapped = originalTags.get(replacementValue.getKey());
       checkArgument(
           mapped != null,
           "Missing original output for Tag %s and Value %s Between original %s and replacement %s",
-          replacementValue.getTag(),
+          replacementValue.getKey(),
           replacementValue.getValue(),
           original,
           replacement.expand());
       resultBuilder.put(
-          replacementValue.getValue(), ReplacementOutput.of(mapped, replacementValue));
-      missingTags.remove(replacementValue.getTag());
+          replacementValue.getValue(),
+          ReplacementOutput.of(
+              mapped, TaggedPValue.of(replacementValue.getKey(), replacementValue.getValue())));
+      missingTags.remove(replacementValue.getKey());
     }
     ImmutableMap<PValue, ReplacementOutput> result = resultBuilder.build();
     checkArgument(

http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SingleInputOutputOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SingleInputOutputOverrideFactory.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SingleInputOutputOverrideFactory.java
index 43bf556..6d0d571 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SingleInputOutputOverrideFactory.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SingleInputOutputOverrideFactory.java
@@ -19,17 +19,16 @@
 package org.apache.beam.runners.core.construction;
 
 import com.google.common.collect.Iterables;
-import java.util.List;
 import java.util.Map;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.beam.sdk.values.TupleTag;
 
 /**
  * A {@link PTransformOverrideFactory} which consumes from a {@link PValue} and produces a
- * {@link PValue}. {@link #getInput(List, Pipeline)} and {@link #mapOutputs(List, PValue)} are
+ * {@link PValue}. {@link #getInput(Map, Pipeline)} and {@link #mapOutputs(Map, PValue)} are
  * implemented.
  */
 public abstract class SingleInputOutputOverrideFactory<
@@ -38,13 +37,13 @@ public abstract class SingleInputOutputOverrideFactory<
         TransformT extends PTransform<InputT, OutputT>>
     implements PTransformOverrideFactory<InputT, OutputT, TransformT> {
   @Override
-  public final InputT getInput(List<TaggedPValue> inputs, Pipeline p) {
-    return (InputT) Iterables.getOnlyElement(inputs).getValue();
+  public final InputT getInput(Map<TupleTag<?>, PValue> inputs, Pipeline p) {
+    return (InputT) Iterables.getOnlyElement(inputs.values());
   }
 
   @Override
   public final Map<PValue, ReplacementOutput> mapOutputs(
-      List<TaggedPValue> outputs, OutputT newOutput) {
+      Map<TupleTag<?>, PValue> outputs, OutputT newOutput) {
     return ReplacementOutputs.singleton(outputs, newOutput);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnsupportedOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnsupportedOverrideFactory.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnsupportedOverrideFactory.java
index 38cbd2a..7b9d704 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnsupportedOverrideFactory.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnsupportedOverrideFactory.java
@@ -18,7 +18,6 @@
 
 package org.apache.beam.runners.core.construction;
 
-import java.util.List;
 import java.util.Map;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory;
@@ -26,7 +25,7 @@ import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.beam.sdk.values.TupleTag;
 
 /**
  * A {@link PTransformOverrideFactory} that throws an exception when a call to
@@ -60,12 +59,13 @@ public final class UnsupportedOverrideFactory<
   }
 
   @Override
-  public InputT getInput(List<TaggedPValue> inputs, Pipeline p) {
+  public InputT getInput(Map<TupleTag<?>, PValue> inputs, Pipeline p) {
     throw new UnsupportedOperationException(message);
   }
 
   @Override
-  public Map<PValue, ReplacementOutput> mapOutputs(List<TaggedPValue> outputs, OutputT newOutput) {
+  public Map<PValue, ReplacementOutput> mapOutputs(
+      Map<TupleTag<?>, PValue> outputs, OutputT newOutput) {
     throw new UnsupportedOperationException(message);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/DeduplicatedFlattenFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/DeduplicatedFlattenFactoryTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/DeduplicatedFlattenFactoryTest.java
index a251f5a..14aa1e6 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/DeduplicatedFlattenFactoryTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/DeduplicatedFlattenFactoryTest.java
@@ -22,7 +22,6 @@ import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertThat;
 
-import com.google.common.collect.Iterables;
 import org.apache.beam.sdk.Pipeline.PipelineVisitor.Defaults;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory.ReplacementOutput;
 import org.apache.beam.sdk.runners.TransformHierarchy;
@@ -35,6 +34,7 @@ import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
 import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TaggedPValue;
 import org.hamcrest.Matchers;
 import org.junit.Rule;
 import org.junit.Test;
@@ -106,7 +106,7 @@ public class DeduplicatedFlattenFactoryTest {
         Matchers.<PValue, ReplacementOutput>hasEntry(
             replacement,
             ReplacementOutput.of(
-                Iterables.getOnlyElement(original.expand()),
-                Iterables.getOnlyElement(replacement.expand()))));
+                TaggedPValue.ofExpandedValue(original),
+                TaggedPValue.ofExpandedValue(replacement))));
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/EmptyFlattenAsCreateFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/EmptyFlattenAsCreateFactoryTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/EmptyFlattenAsCreateFactoryTest.java
index ad9d908..90bbee7 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/EmptyFlattenAsCreateFactoryTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/EmptyFlattenAsCreateFactoryTest.java
@@ -21,7 +21,6 @@ package org.apache.beam.runners.core.construction;
 import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertThat;
 
-import com.google.common.collect.Iterables;
 import java.util.Collections;
 import java.util.Map;
 import org.apache.beam.sdk.io.CountingInput;
@@ -34,6 +33,7 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
 import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.beam.sdk.values.TupleTag;
 import org.hamcrest.Matchers;
 import org.junit.Rule;
 import org.junit.Test;
@@ -55,7 +55,7 @@ public class EmptyFlattenAsCreateFactoryTest {
   @Test
   public void getInputEmptySucceeds() {
     assertThat(
-        factory.getInput(Collections.<TaggedPValue>emptyList(), pipeline).size(), equalTo(0));
+        factory.getInput(Collections.<TupleTag<?>, PValue>emptyMap(), pipeline).size(), equalTo(0));
   }
 
   @Test
@@ -80,8 +80,8 @@ public class EmptyFlattenAsCreateFactoryTest {
         Matchers.<PValue, ReplacementOutput>hasEntry(
             replacement,
             ReplacementOutput.of(
-                Iterables.getOnlyElement(original.expand()),
-                Iterables.getOnlyElement(replacement.expand()))));
+                TaggedPValue.ofExpandedValue(original),
+                TaggedPValue.ofExpandedValue(replacement))));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
index d053f62..4084cdc 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
@@ -24,7 +24,7 @@ import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertThat;
 
 import com.google.common.base.MoreObjects;
-import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import java.io.Serializable;
 import java.util.Collections;
 import org.apache.beam.sdk.coders.VarIntCoder;
@@ -65,7 +65,7 @@ import org.apache.beam.sdk.values.PCollection.IsBounded;
 import org.apache.beam.sdk.values.PCollectionList;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PDone;
-import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
 import org.hamcrest.Matchers;
@@ -385,17 +385,15 @@ public class PTransformMatchersTest implements Serializable {
   public void emptyFlattenWithEmptyFlatten() {
     AppliedPTransform application =
         AppliedPTransform
-            .<PCollectionList<Object>, PCollection<Object>, Flatten.PCollections<Object>>
-                of(
-                    "EmptyFlatten",
-                    Collections.<TaggedPValue>emptyList(),
-                    Collections.singletonList(
-                        TaggedPValue.of(
-                            new TupleTag<Object>(),
-                            PCollection.createPrimitiveOutputInternal(
-                                p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED))),
-                    Flatten.pCollections(),
-                    p);
+            .<PCollectionList<Object>, PCollection<Object>, Flatten.PCollections<Object>>of(
+                "EmptyFlatten",
+                Collections.<TupleTag<?>, PValue>emptyMap(),
+                Collections.<TupleTag<?>, PValue>singletonMap(
+                    new TupleTag<Object>(),
+                    PCollection.createPrimitiveOutputInternal(
+                        p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED)),
+                Flatten.pCollections(),
+                p);
 
     assertThat(PTransformMatchers.emptyFlatten().matches(application), is(true));
   }
@@ -404,21 +402,18 @@ public class PTransformMatchersTest implements Serializable {
   public void emptyFlattenWithNonEmptyFlatten() {
     AppliedPTransform application =
         AppliedPTransform
-            .<PCollectionList<Object>, PCollection<Object>, Flatten.PCollections<Object>>
-                of(
-                    "Flatten",
-                    Collections.singletonList(
-                        TaggedPValue.of(
-                            new TupleTag<Object>(),
-                            PCollection.createPrimitiveOutputInternal(
-                                p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED))),
-                    Collections.singletonList(
-                        TaggedPValue.of(
-                            new TupleTag<Object>(),
-                            PCollection.createPrimitiveOutputInternal(
-                                p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED))),
-                    Flatten.pCollections(),
-                    p);
+            .<PCollectionList<Object>, PCollection<Object>, Flatten.PCollections<Object>>of(
+                "Flatten",
+                Collections.<TupleTag<?>, PValue>singletonMap(
+                    new TupleTag<Object>(),
+                    PCollection.createPrimitiveOutputInternal(
+                        p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED)),
+                Collections.<TupleTag<?>, PValue>singletonMap(
+                    new TupleTag<Object>(),
+                    PCollection.createPrimitiveOutputInternal(
+                        p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED)),
+                Flatten.pCollections(),
+                p);
 
     assertThat(PTransformMatchers.emptyFlatten().matches(application), is(false));
   }
@@ -427,18 +422,16 @@ public class PTransformMatchersTest implements Serializable {
   public void emptyFlattenWithNonFlatten() {
     AppliedPTransform application =
         AppliedPTransform
-            .<PCollection<Iterable<Object>>, PCollection<Object>, Flatten.Iterables<Object>>
-                of(
-                    "EmptyFlatten",
-                    Collections.<TaggedPValue>emptyList(),
-                    Collections.singletonList(
-                        TaggedPValue.of(
-                            new TupleTag<Object>(),
-                            PCollection.createPrimitiveOutputInternal(
-                                p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED))),
-                    Flatten.iterables() /* This isn't actually possible to construct,
+            .<PCollection<Iterable<Object>>, PCollection<Object>, Flatten.Iterables<Object>>of(
+                "EmptyFlatten",
+                Collections.<TupleTag<?>, PValue>emptyMap(),
+                Collections.<TupleTag<?>, PValue>singletonMap(
+                    new TupleTag<Object>(),
+                    PCollection.createPrimitiveOutputInternal(
+                        p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED)),
+                Flatten.iterables() /* This isn't actually possible to construct,
                                  * but for the sake of example */,
-                    p);
+                p);
 
     assertThat(PTransformMatchers.emptyFlatten().matches(application), is(false));
   }
@@ -447,19 +440,16 @@ public class PTransformMatchersTest implements Serializable {
   public void flattenWithDuplicateInputsWithoutDuplicates() {
     AppliedPTransform application =
         AppliedPTransform
-            .<PCollectionList<Object>, PCollection<Object>, Flatten.PCollections<Object>>
-                of(
+            .<PCollectionList<Object>, PCollection<Object>, Flatten.PCollections<Object>>of(
                 "Flatten",
-                Collections.singletonList(
-                    TaggedPValue.of(
-                        new TupleTag<Object>(),
-                        PCollection.createPrimitiveOutputInternal(
-                            p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED))),
-                Collections.singletonList(
-                    TaggedPValue.of(
-                        new TupleTag<Object>(),
-                        PCollection.createPrimitiveOutputInternal(
-                            p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED))),
+                Collections.<TupleTag<?>, PValue>singletonMap(
+                    new TupleTag<Object>(),
+                    PCollection.createPrimitiveOutputInternal(
+                        p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED)),
+                Collections.<TupleTag<?>, PValue>singletonMap(
+                    new TupleTag<Object>(),
+                    PCollection.createPrimitiveOutputInternal(
+                        p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED)),
                 Flatten.pCollections(),
                 p);
 
@@ -468,21 +458,21 @@ public class PTransformMatchersTest implements Serializable {
 
   @Test
   public void flattenWithDuplicateInputsWithDuplicates() {
-    PCollection<Object> duplicate = PCollection.createPrimitiveOutputInternal(p,
-        WindowingStrategy.globalDefault(),
-        IsBounded.BOUNDED);
+    PCollection<Object> duplicate =
+        PCollection.createPrimitiveOutputInternal(
+            p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
     AppliedPTransform application =
         AppliedPTransform
             .<PCollectionList<Object>, PCollection<Object>, Flatten.PCollections<Object>>of(
                 "Flatten",
-                ImmutableList.of(
-                    TaggedPValue.of(new TupleTag<Object>(), duplicate),
-                    TaggedPValue.of(new TupleTag<Object>(), duplicate)),
-                Collections.singletonList(
-                    TaggedPValue.of(
-                        new TupleTag<Object>(),
-                        PCollection.createPrimitiveOutputInternal(
-                            p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED))),
+                ImmutableMap.<TupleTag<?>, PValue>builder()
+                    .put(new TupleTag<Object>(), duplicate)
+                    .put(new TupleTag<Object>(), duplicate)
+                    .build(),
+                Collections.<TupleTag<?>, PValue>singletonMap(
+                    new TupleTag<Object>(),
+                    PCollection.createPrimitiveOutputInternal(
+                        p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED)),
                 Flatten.pCollections(),
                 p);
 
@@ -493,15 +483,13 @@ public class PTransformMatchersTest implements Serializable {
   public void flattenWithDuplicateInputsNonFlatten() {
     AppliedPTransform application =
         AppliedPTransform
-            .<PCollection<Iterable<Object>>, PCollection<Object>, Flatten.Iterables<Object>>
-                of(
+            .<PCollection<Iterable<Object>>, PCollection<Object>, Flatten.Iterables<Object>>of(
                 "EmptyFlatten",
-                Collections.<TaggedPValue>emptyList(),
-                Collections.singletonList(
-                    TaggedPValue.of(
-                        new TupleTag<Object>(),
-                        PCollection.createPrimitiveOutputInternal(
-                            p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED))),
+                Collections.<TupleTag<?>, PValue>emptyMap(),
+                Collections.<TupleTag<?>, PValue>singletonMap(
+                    new TupleTag<Object>(),
+                    PCollection.createPrimitiveOutputInternal(
+                        p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED)),
                 Flatten.iterables() /* This isn't actually possible to construct,
                                  * but for the sake of example */,
                 p);
@@ -541,8 +529,8 @@ public class PTransformMatchersTest implements Serializable {
   private AppliedPTransform<?, ?, ?> appliedWrite(Write<Integer> write) {
     return AppliedPTransform.<PCollection<Integer>, PDone, Write<Integer>>of(
         "Write",
-        Collections.<TaggedPValue>emptyList(),
-        Collections.<TaggedPValue>emptyList(),
+        Collections.<TupleTag<?>, PValue>emptyMap(),
+        Collections.<TupleTag<?>, PValue>emptyMap(),
         write,
         p);
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReplacementOutputsTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReplacementOutputsTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReplacementOutputsTest.java
index abfdeef..00c436d 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReplacementOutputsTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReplacementOutputsTest.java
@@ -21,18 +21,15 @@ package org.apache.beam.runners.core.construction;
 import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertThat;
 
-import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
-import java.util.List;
 import java.util.Map;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory.ReplacementOutput;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollection.IsBounded;
-import org.apache.beam.sdk.values.PCollectionList;
 import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TaggedPValue;
 import org.apache.beam.sdk.values.TupleTag;
@@ -79,8 +76,10 @@ public class ReplacementOutputsTest {
     assertThat(replacements, Matchers.<PValue>hasKey(replacementInts));
 
     ReplacementOutput replacement = replacements.get(replacementInts);
-    TaggedPValue taggedInts = Iterables.getOnlyElement(ints.expand());
-    assertThat(replacement.getOriginal(), equalTo(taggedInts));
+    Map.Entry<TupleTag<?>, PValue> taggedInts = Iterables.getOnlyElement(ints.expand().entrySet());
+    assertThat(
+        replacement.getOriginal().getTag(), Matchers.<TupleTag<?>>equalTo(taggedInts.getKey()));
+    assertThat(replacement.getOriginal().getValue(), equalTo(taggedInts.getValue()));
     assertThat(replacement.getReplacement().getValue(), Matchers.<PValue>equalTo(replacementInts));
   }
 
@@ -88,44 +87,11 @@ public class ReplacementOutputsTest {
   public void singletonMultipleOriginalsThrows() {
     thrown.expect(IllegalArgumentException.class);
     ReplacementOutputs.singleton(
-        ImmutableList.copyOf(Iterables.concat(ints.expand(), moreInts.expand())), replacementInts);
-  }
-
-  @Test
-  public void orderedSucceeds() {
-    List<TaggedPValue> originals = PCollectionList.of(ints).and(moreInts).expand();
-    Map<PValue, ReplacementOutput> replacements =
-        ReplacementOutputs.ordered(
-            originals, PCollectionList.of(replacementInts).and(moreReplacementInts));
-    assertThat(
-        replacements.keySet(),
-        Matchers.<PValue>containsInAnyOrder(replacementInts, moreReplacementInts));
-
-    ReplacementOutput intsMapping = replacements.get(replacementInts);
-    assertThat(intsMapping.getOriginal().getValue(), Matchers.<PValue>equalTo(ints));
-    assertThat(intsMapping.getReplacement().getValue(), Matchers.<PValue>equalTo(replacementInts));
-
-    ReplacementOutput moreIntsMapping = replacements.get(moreReplacementInts);
-    assertThat(moreIntsMapping.getOriginal().getValue(), Matchers.<PValue>equalTo(moreInts));
-    assertThat(
-        moreIntsMapping.getReplacement().getValue(), Matchers.<PValue>equalTo(moreReplacementInts));
-  }
-
-  @Test
-  public void orderedTooManyReplacements() {
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("same size");
-    ReplacementOutputs.ordered(
-        PCollectionList.of(ints).expand(),
-        PCollectionList.of(replacementInts).and(moreReplacementInts));
-  }
-
-  @Test
-  public void orderedTooFewReplacements() {
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("same size");
-    ReplacementOutputs.ordered(
-        PCollectionList.of(ints).and(moreInts).expand(), PCollectionList.of(moreReplacementInts));
+        ImmutableMap.<TupleTag<?>, PValue>builder()
+            .putAll(ints.expand())
+            .putAll(moreInts.expand())
+            .build(),
+        replacementInts);
   }
 
   private TupleTag<Integer> intsTag = new TupleTag<>();
@@ -168,61 +134,6 @@ public class ReplacementOutputsTest {
                 TaggedPValue.of(moreIntsTag, moreReplacementInts))));
   }
 
-  /**
-   * When a call to {@link ReplacementOutputs#tagged(List, POutput)} is made where the first
-   * argument contains multiple copies of the same {@link TaggedPValue}, the call succeeds using
-   * that mapping.
-   */
-  @Test
-  public void taggedMultipleInstances() {
-    List<TaggedPValue> original =
-        ImmutableList.of(
-            TaggedPValue.of(intsTag, ints),
-            TaggedPValue.of(strsTag, strs),
-            TaggedPValue.of(intsTag, ints));
-
-    Map<PValue, ReplacementOutput> replacements =
-        ReplacementOutputs.tagged(
-            original, PCollectionTuple.of(strsTag, replacementStrs).and(intsTag, replacementInts));
-    assertThat(
-        replacements.keySet(),
-        Matchers.<PValue>containsInAnyOrder(replacementStrs, replacementInts));
-    ReplacementOutput intsReplacement = replacements.get(replacementInts);
-    ReplacementOutput strsReplacement = replacements.get(replacementStrs);
-
-    assertThat(
-        intsReplacement,
-        equalTo(
-            ReplacementOutput.of(
-                TaggedPValue.of(intsTag, ints), TaggedPValue.of(intsTag, replacementInts))));
-    assertThat(
-        strsReplacement,
-        equalTo(
-            ReplacementOutput.of(
-                TaggedPValue.of(strsTag, strs), TaggedPValue.of(strsTag, replacementStrs))));
-  }
-
-  /**
-   * When a call to {@link ReplacementOutputs#tagged(List, POutput)} is made where a single tag
-   * has multiple {@link PValue PValues} mapped to it, the call fails.
-   */
-  @Test
-  public void taggedMultipleConflictingInstancesThrows() {
-    List<TaggedPValue> original =
-        ImmutableList.of(
-            TaggedPValue.of(intsTag, ints), TaggedPValue.of(intsTag, moreReplacementInts));
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("different values");
-    thrown.expectMessage(intsTag.toString());
-    thrown.expectMessage(ints.toString());
-    thrown.expectMessage(moreReplacementInts.toString());
-    ReplacementOutputs.tagged(
-        original,
-        PCollectionTuple.of(strsTag, replacementStrs)
-            .and(moreIntsTag, moreReplacementInts)
-            .and(intsTag, replacementInts));
-  }
-
   @Test
   public void taggedMissingReplacementThrows() {
     PCollectionTuple original =

http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SingleInputOutputOverrideFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SingleInputOutputOverrideFactoryTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SingleInputOutputOverrideFactoryTest.java
index b4cdd1f..07352f5 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SingleInputOutputOverrideFactoryTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SingleInputOutputOverrideFactoryTest.java
@@ -20,7 +20,6 @@ package org.apache.beam.runners.core.construction;
 
 import static org.junit.Assert.assertThat;
 
-import com.google.common.collect.Iterables;
 import java.io.Serializable;
 import java.util.Map;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory.ReplacementOutput;
@@ -32,6 +31,7 @@ import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
 import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TaggedPValue;
 import org.hamcrest.Matchers;
 import org.junit.Rule;
 import org.junit.Test;
@@ -97,8 +97,8 @@ public class SingleInputOutputOverrideFactoryTest implements Serializable {
         Matchers.<PValue, ReplacementOutput>hasEntry(
             reappliedOutput,
             ReplacementOutput.of(
-                Iterables.getOnlyElement(output.expand()),
-                Iterables.getOnlyElement(reappliedOutput.expand()))));
+                TaggedPValue.ofExpandedValue(output),
+                TaggedPValue.ofExpandedValue(reappliedOutput))));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnsupportedOverrideFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnsupportedOverrideFactoryTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnsupportedOverrideFactoryTest.java
index f33d0f9..81ce00d 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnsupportedOverrideFactoryTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnsupportedOverrideFactoryTest.java
@@ -23,7 +23,8 @@ import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.values.PDone;
-import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -53,13 +54,13 @@ public class UnsupportedOverrideFactoryTest {
   public void getInputThrows() {
     thrown.expect(UnsupportedOperationException.class);
     thrown.expectMessage(message);
-    factory.getInput(Collections.<TaggedPValue>emptyList(), pipeline);
+    factory.getInput(Collections.<TupleTag<?>, PValue>emptyMap(), pipeline);
   }
 
   @Test
   public void mapOutputThrows() {
     thrown.expect(UnsupportedOperationException.class);
     thrown.expectMessage(message);
-    factory.mapOutputs(Collections.<TaggedPValue>emptyList(), PDone.in(pipeline));
+    factory.mapOutputs(Collections.<TupleTag<?>, PValue>emptyMap(), PDone.in(pipeline));
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
index 57735e7..5bd6f7e 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
@@ -117,7 +117,7 @@ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory {
         ExecutorService executor) {
       this.evaluationContext = evaluationContext;
       this.outputPCollection =
-          (PCollection<OutputT>) Iterables.getOnlyElement(transform.getOutputs()).getValue();
+          (PCollection<OutputT>) Iterables.getOnlyElement(transform.getOutputs().values());
       this.resultBuilder = StepTransformResult.withoutHold(transform);
       this.minimumDynamicSplitSize = minimumDynamicSplitSize;
       this.produceSplitExecutor = executor;

http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
index 7e6845d..c342136 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
@@ -34,7 +34,6 @@ import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TaggedPValue;
 
 /**
  * Tracks the {@link AppliedPTransform AppliedPTransforms} that consume each {@link PValue} in the
@@ -83,8 +82,8 @@ class DirectGraphVisitor extends PipelineVisitor.Defaults {
     if (node.getInputs().isEmpty()) {
       rootTransforms.add(appliedTransform);
     } else {
-      for (TaggedPValue value : node.getInputs()) {
-        primitiveConsumers.put(value.getValue(), appliedTransform);
+      for (PValue value : node.getInputs().values()) {
+        primitiveConsumers.put(value, appliedTransform);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index 8b9f995..c802c58 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -534,8 +534,8 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
                   .createKeyedBundle(
                       transformTimers.getKey(),
                       (PCollection)
-                          Iterables.getOnlyElement(transformTimers.getTransform().getInputs())
-                              .getValue())
+                          Iterables.getOnlyElement(
+                              transformTimers.getTransform().getInputs().values()))
                   .add(WindowedValue.valueInGlobalWindow(work))
                   .commit(evaluationContext.now());
           scheduleConsumption(

http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
index 8528905..7c6d2a1 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
@@ -57,7 +57,7 @@ class FlattenEvaluatorFactory implements TransformEvaluatorFactory {
           application) {
     final UncommittedBundle<InputT> outputBundle =
         evaluationContext.createBundle(
-            (PCollection<InputT>) Iterables.getOnlyElement(application.getOutputs()).getValue());
+            (PCollection<InputT>) Iterables.getOnlyElement(application.getOutputs().values()));
     final TransformResult<InputT> result =
         StepTransformResult.<InputT>withoutHold(application).addOutput(outputBundle).build();
     return new FlattenEvaluator<>(outputBundle, result);

http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
index 04e5aaa..f7fd4cf 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
@@ -162,7 +162,7 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
           evaluationContext.createKeyedBundle(
               structuralKey,
               (PCollection<KV<K, Iterable<V>>>)
-                  Iterables.getOnlyElement(application.getOutputs()).getValue());
+                  Iterables.getOnlyElement(application.getOutputs().values()));
       outputBundles.add(bundle);
       CopyOnAccessInMemoryStateInternals<K> stateInternals =
           (CopyOnAccessInMemoryStateInternals<K>) stepContext.stateInternals();

http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java
index ef96a92..ac0b14f 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java
@@ -105,7 +105,7 @@ class GroupByKeyOnlyEvaluatorFactory implements TransformEvaluatorFactory {
       this.application = application;
       this.keyCoder =
           getKeyCoder(
-              ((PCollection<KV<K, V>>) Iterables.getOnlyElement(application.getInputs()).getValue())
+              ((PCollection<KV<K, V>>) Iterables.getOnlyElement(application.getInputs().values()))
                   .getCoder());
       this.groupingMap = new HashMap<>();
     }
@@ -158,7 +158,7 @@ class GroupByKeyOnlyEvaluatorFactory implements TransformEvaluatorFactory {
             evaluationContext.createKeyedBundle(
                 StructuralKey.of(key, keyCoder),
                 (PCollection<KeyedWorkItem<K, V>>)
-                    Iterables.getOnlyElement(application.getOutputs()).getValue());
+                    Iterables.getOnlyElement(application.getOutputs().values()));
         bundle.add(WindowedValue.valueInGlobalWindow(groupedKv));
         resultBuilder.addOutput(bundle);
       }

http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java
index 02b1bed..f9b6eba 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java
@@ -21,7 +21,7 @@ import static com.google.common.base.Preconditions.checkState;
 
 import com.google.common.collect.ImmutableSet;
 import java.util.HashSet;
-import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import org.apache.beam.runners.core.SplittableParDo;
 import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupAlsoByWindow;
@@ -32,7 +32,7 @@ import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.beam.sdk.values.TupleTag;
 
 /**
  * A pipeline visitor that tracks all keyed {@link PValue PValues}. A {@link PValue} is keyed if it
@@ -83,9 +83,9 @@ class KeyedPValueTrackingVisitor implements PipelineVisitor {
     if (node.isRootNode()) {
       finalized = true;
     } else if (PRODUCES_KEYED_OUTPUTS.contains(node.getTransform().getClass())) {
-      List<TaggedPValue> outputs = node.getOutputs();
-      for (TaggedPValue output : outputs) {
-        keyedValues.add(output.getValue());
+      Map<TupleTag<?>, PValue> outputs = node.getOutputs();
+      for (PValue output : outputs.values()) {
+        keyedValues.add(output);
       }
     }
   }
@@ -96,8 +96,8 @@ class KeyedPValueTrackingVisitor implements PipelineVisitor {
   @Override
   public void visitValue(PValue value, TransformHierarchy.Node producer) {
     boolean inputsAreKeyed = true;
-    for (TaggedPValue input : producer.getInputs()) {
-      inputsAreKeyed = inputsAreKeyed && keyedValues.contains(input.getValue());
+    for (PValue input : producer.getInputs().values()) {
+      inputsAreKeyed = inputsAreKeyed && keyedValues.contains(input);
     }
     if (PRODUCES_KEYED_OUTPUTS.contains(producer.getTransform().getClass())
         || (isKeyPreserving(producer.getTransform()) && inputsAreKeyed)) {

http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
index b0e97fb..b8a13e2 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
@@ -32,7 +32,7 @@ import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TupleTag;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -141,7 +141,7 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator
           evaluationContext,
           stepContext,
           application,
-          ((PCollection<InputT>) Iterables.getOnlyElement(application.getInputs()).getValue())
+          ((PCollection<InputT>) Iterables.getOnlyElement(application.getInputs().values()))
               .getWindowingStrategy(),
           fn,
           key,
@@ -162,10 +162,10 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator
     }
   }
 
-  private Map<TupleTag<?>, PCollection<?>> pcollections(List<TaggedPValue> outputs) {
+  private Map<TupleTag<?>, PCollection<?>> pcollections(Map<TupleTag<?>, PValue> outputs) {
     Map<TupleTag<?>, PCollection<?>> pcs = new HashMap<>();
-    for (TaggedPValue output : outputs) {
-      pcs.put(output.getTag(), (PCollection<?>) output.getValue());
+    for (Map.Entry<TupleTag<?>, PValue> output : outputs.entrySet()) {
+      pcs.put(output.getKey(), (PCollection<?>) output.getValue());
     }
     return pcs;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
index 056a0c3..00c0d6a 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
@@ -20,7 +20,6 @@ package org.apache.beam.runners.direct;
 import static com.google.common.base.Preconditions.checkState;
 
 import com.google.common.collect.Iterables;
-import java.util.List;
 import java.util.Map;
 import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.KeyedWorkItemCoder;
@@ -50,7 +49,7 @@ import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
 import org.apache.beam.sdk.values.TypedPValue;
 
@@ -86,13 +85,13 @@ class ParDoMultiOverrideFactory<InputT, OutputT>
 
   @Override
   public PCollection<? extends InputT> getInput(
-      List<TaggedPValue> inputs, Pipeline p) {
-    return (PCollection<? extends InputT>) Iterables.getOnlyElement(inputs).getValue();
+      Map<TupleTag<?>, PValue> inputs, Pipeline p) {
+    return (PCollection<? extends InputT>) Iterables.getOnlyElement(inputs.values());
   }
 
   @Override
   public Map<PValue, ReplacementOutput> mapOutputs(
-      List<TaggedPValue> outputs, PCollectionTuple newOutput) {
+      Map<TupleTag<?>, PValue> outputs, PCollectionTuple newOutput) {
     return ReplacementOutputs.tagged(outputs, newOutput);
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
index 77bebb2..f8fe3d6 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
@@ -27,6 +27,7 @@ import com.google.common.collect.Lists;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Map.Entry;
 import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.KeyedWorkItems;
 import org.apache.beam.runners.core.StateNamespace;
@@ -52,7 +53,7 @@ import org.apache.beam.sdk.util.state.StateSpec;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TupleTag;
 
 /** A {@link TransformEvaluatorFactory} for stateful {@link ParDo}. */
@@ -139,8 +140,9 @@ final class StatefulParDoEvaluatorFactory<K, InputT, OutputT> implements Transfo
       String stepName = evaluationContext.getStepName(transformOutputWindow.getTransform());
 
       Map<TupleTag<?>, PCollection<?>> taggedValues = new HashMap<>();
-      for (TaggedPValue pv : transformOutputWindow.getTransform().getOutputs()) {
-        taggedValues.put(pv.getTag(), (PCollection<?>) pv.getValue());
+      for (Entry<TupleTag<?>, PValue> pv :
+          transformOutputWindow.getTransform().getOutputs().entrySet()) {
+        taggedValues.put(pv.getKey(), (PCollection<?>) pv.getValue());
       }
       PCollection<?> pc =
           taggedValues

http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
index 0dd8919..6e0a4fc 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
@@ -48,8 +48,8 @@ import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollection.IsBounded;
 import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TaggedPValue;
 import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TupleTag;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 
@@ -108,7 +108,7 @@ class TestStreamEvaluatorFactory implements TransformEvaluatorFactory {
       if (event.getType().equals(EventType.ELEMENT)) {
         UncommittedBundle<T> bundle =
             context.createBundle(
-                (PCollection<T>) Iterables.getOnlyElement(application.getOutputs()).getValue());
+                (PCollection<T>) Iterables.getOnlyElement(application.getOutputs().values()));
         for (TimestampedValue<T> elem : ((ElementEvent<T>) event).getElements()) {
           bundle.add(
               WindowedValue.timestampedValueInGlobalWindow(elem.getValue(), elem.getTimestamp()));
@@ -176,13 +176,13 @@ class TestStreamEvaluatorFactory implements TransformEvaluatorFactory {
     }
 
     @Override
-    public PBegin getInput(List<TaggedPValue> inputs, Pipeline p) {
+    public PBegin getInput(Map<TupleTag<?>, PValue> inputs, Pipeline p) {
       return p.begin();
     }
 
     @Override
     public Map<PValue, ReplacementOutput> mapOutputs(
-        List<TaggedPValue> outputs, PCollection<T> newOutput) {
+        Map<TupleTag<?>, PValue> outputs, PCollection<T> newOutput) {
       return ReplacementOutputs.singleton(outputs, newOutput);
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
index f0eef58..91e7248 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
@@ -121,7 +121,7 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
         WindowedValue<UnboundedSourceShard<OutputT, CheckpointMarkT>> element) throws IOException {
       UncommittedBundle<OutputT> output =
           evaluationContext.createBundle(
-              (PCollection<OutputT>) getOnlyElement(transform.getOutputs()).getValue());
+              (PCollection<OutputT>) getOnlyElement(transform.getOutputs().values()));
       UnboundedSourceShard<OutputT, CheckpointMarkT> shard = element.getValue();
       UnboundedReader<OutputT> reader = null;
       try {
@@ -227,7 +227,7 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
       // committing the output.
       if (!reader.getWatermark().isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
         PCollection<OutputT> outputPc =
-            (PCollection<OutputT>) Iterables.getOnlyElement(transform.getOutputs()).getValue();
+            (PCollection<OutputT>) Iterables.getOnlyElement(transform.getOutputs().values());
         evaluationContext.scheduleAfterOutputWouldBeProduced(
             outputPc,
             GlobalWindow.INSTANCE,

http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
index dc74d3e..8cbe8fc 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
@@ -64,9 +64,9 @@ class ViewEvaluatorFactory implements TransformEvaluatorFactory {
       final AppliedPTransform<PCollection<Iterable<InT>>, PCollectionView<OuT>, WriteView<InT, OuT>>
           application) {
     PCollection<Iterable<InT>> input =
-        (PCollection<Iterable<InT>>) Iterables.getOnlyElement(application.getInputs()).getValue();
+        (PCollection<Iterable<InT>>) Iterables.getOnlyElement(application.getInputs().values());
     final PCollectionViewWriter<InT, OuT> writer = context.createPCollectionViewWriter(input,
-        (PCollectionView<OuT>) Iterables.getOnlyElement(application.getOutputs()).getValue());
+        (PCollectionView<OuT>) Iterables.getOnlyElement(application.getOutputs().values()));
     return new TransformEvaluator<Iterable<InT>>() {
       private final List<WindowedValue<InT>> elements = new ArrayList<>();
 

http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
index 64e1218..52dc329 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
@@ -20,7 +20,6 @@ package org.apache.beam.runners.direct;
 
 import com.google.common.collect.Iterables;
 import java.util.Collections;
-import java.util.List;
 import java.util.Map;
 import org.apache.beam.runners.core.construction.ForwardingPTransform;
 import org.apache.beam.sdk.Pipeline;
@@ -35,7 +34,7 @@ import org.apache.beam.sdk.transforms.WithKeys;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.beam.sdk.values.TupleTag;
 
 /**
  * A {@link PTransformOverrideFactory} that provides overrides for the {@link CreatePCollectionView}
@@ -51,13 +50,13 @@ class ViewOverrideFactory<ElemT, ViewT>
   }
 
   @Override
-  public PCollection<ElemT> getInput(List<TaggedPValue> inputs, Pipeline p) {
-    return (PCollection<ElemT>) Iterables.getOnlyElement(inputs).getValue();
+  public PCollection<ElemT> getInput(Map<TupleTag<?>, PValue> inputs, Pipeline p) {
+    return (PCollection<ElemT>) Iterables.getOnlyElement(inputs.values());
   }
 
   @Override
   public Map<PValue, ReplacementOutput> mapOutputs(
-      List<TaggedPValue> outputs, PCollectionView<ViewT> newOutput) {
+      Map<TupleTag<?>, PValue> outputs, PCollectionView<ViewT> newOutput) {
     return Collections.emptyMap();
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
index 942e67c..8c04362 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
@@ -61,7 +61,8 @@ import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
 import org.joda.time.Instant;
 
 /**
@@ -818,13 +819,13 @@ public class WatermarkManager {
 
   private Collection<Watermark> getInputProcessingWatermarks(AppliedPTransform<?, ?, ?> transform) {
     ImmutableList.Builder<Watermark> inputWmsBuilder = ImmutableList.builder();
-    List<TaggedPValue> inputs = transform.getInputs();
+    Map<TupleTag<?>, PValue> inputs = transform.getInputs();
     if (inputs.isEmpty()) {
       inputWmsBuilder.add(THE_END_OF_TIME);
     }
-    for (TaggedPValue pvalue : inputs) {
+    for (PValue pvalue : inputs.values()) {
       Watermark producerOutputWatermark =
-          getTransformWatermark(graph.getProducer(pvalue.getValue()))
+          getTransformWatermark(graph.getProducer(pvalue))
               .synchronizedProcessingOutputWatermark;
       inputWmsBuilder.add(producerOutputWatermark);
     }
@@ -833,13 +834,13 @@ public class WatermarkManager {
 
   private List<Watermark> getInputWatermarks(AppliedPTransform<?, ?, ?> transform) {
     ImmutableList.Builder<Watermark> inputWatermarksBuilder = ImmutableList.builder();
-    List<TaggedPValue> inputs = transform.getInputs();
+    Map<TupleTag<?>, PValue> inputs = transform.getInputs();
     if (inputs.isEmpty()) {
       inputWatermarksBuilder.add(THE_END_OF_TIME);
     }
-    for (TaggedPValue pvalue : inputs) {
+    for (PValue pvalue : inputs.values()) {
       Watermark producerOutputWatermark =
-          getTransformWatermark(graph.getProducer(pvalue.getValue())).outputWatermark;
+          getTransformWatermark(graph.getProducer(pvalue)).outputWatermark;
       inputWatermarksBuilder.add(producerOutputWatermark);
     }
     List<Watermark> inputCollectionWatermarks = inputWatermarksBuilder.build();
@@ -1023,8 +1024,8 @@ public class WatermarkManager {
     WatermarkUpdate updateResult = myWatermarks.refresh();
     if (updateResult.isAdvanced()) {
       Set<AppliedPTransform<?, ?, ?>> additionalRefreshes = new HashSet<>();
-      for (TaggedPValue outputPValue : toRefresh.getOutputs()) {
-        additionalRefreshes.addAll(graph.getPrimitiveConsumers(outputPValue.getValue()));
+      for (PValue outputPValue : toRefresh.getOutputs().values()) {
+        additionalRefreshes.addAll(graph.getPrimitiveConsumers(outputPValue));
       }
       return additionalRefreshes;
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
index 8974c67..2550924 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
@@ -57,7 +57,7 @@ class WindowEvaluatorFactory implements TransformEvaluatorFactory {
     WindowFn<? super InputT, ?> fn = transform.getTransform().getWindowFn();
     UncommittedBundle<InputT> outputBundle =
         evaluationContext.createBundle(
-            (PCollection<InputT>) Iterables.getOnlyElement(transform.getOutputs()).getValue());
+            (PCollection<InputT>) Iterables.getOnlyElement(transform.getOutputs().values()));
     if (fn == null) {
       return PassthroughTransformEvaluator.create(transform, outputBundle);
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
index 1bf5839..b3f92ab 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
@@ -24,7 +24,6 @@ import com.google.common.base.Suppliers;
 import com.google.common.collect.Iterables;
 import java.io.Serializable;
 import java.util.Collections;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
 import org.apache.beam.sdk.Pipeline;
@@ -41,7 +40,7 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PDone;
 import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.beam.sdk.values.TupleTag;
 
 /**
  * A {@link PTransformOverrideFactory} that overrides {@link Write} {@link PTransform PTransforms}
@@ -60,12 +59,13 @@ class WriteWithShardingFactory<InputT>
   }
 
   @Override
-  public PCollection<InputT> getInput(List<TaggedPValue> inputs, Pipeline p) {
-    return (PCollection<InputT>) Iterables.getOnlyElement(inputs).getValue();
+  public PCollection<InputT> getInput(Map<TupleTag<?>, PValue> inputs, Pipeline p) {
+    return (PCollection<InputT>) Iterables.getOnlyElement(inputs.values());
   }
 
   @Override
-  public Map<PValue, ReplacementOutput> mapOutputs(List<TaggedPValue> outputs, PDone newOutput) {
+  public Map<PValue, ReplacementOutput> mapOutputs(
+      Map<TupleTag<?>, PValue> outputs, PDone newOutput) {
     return Collections.emptyMap();
   }