You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/12/21 22:50:04 UTC

[30/51] [abbrv] incubator-beam git commit: Provide local tags in PInput, POutput expansions

Provide local tags in PInput, POutput expansions

Output an ordered colleciton in PInput and POutput expansions.

This provides information that is necessary to reconstruct a PInput
or POutput from its expansion.

Implement PCollectionList.equals, PCollectionTuple.equals


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/34373c21
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/34373c21
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/34373c21

Branch: refs/heads/python-sdk
Commit: 34373c21ed67696235d88ef40d50e31c77b84c33
Parents: 6a05d7f
Author: Thomas Groh <tg...@google.com>
Authored: Tue Dec 6 11:03:52 2016 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Dec 20 15:18:55 2016 -0800

----------------------------------------------------------------------
 .../beam/runners/direct/DirectGraphVisitor.java |  18 +--
 .../beam/runners/direct/EvaluationContext.java  |   7 +-
 .../direct/KeyedPValueTrackingVisitor.java      |  16 ++-
 .../beam/runners/direct/WatermarkManager.java   |  19 +--
 .../apache/beam/runners/spark/SparkRunner.java  |  13 ++-
 .../beam/sdk/runners/TransformHierarchy.java    |  49 ++++----
 .../transforms/join/KeyedPCollectionTuple.java  |   9 +-
 .../java/org/apache/beam/sdk/values/PBegin.java |   4 +-
 .../apache/beam/sdk/values/PCollectionList.java |  65 +++++++----
 .../beam/sdk/values/PCollectionTuple.java       |  28 ++++-
 .../java/org/apache/beam/sdk/values/PDone.java  |   4 +-
 .../java/org/apache/beam/sdk/values/PInput.java |   4 +-
 .../org/apache/beam/sdk/values/POutput.java     |   4 +-
 .../java/org/apache/beam/sdk/values/PValue.java |  10 ++
 .../org/apache/beam/sdk/values/PValueBase.java  |  11 +-
 .../apache/beam/sdk/values/TaggedPValue.java    |  42 +++++++
 .../sdk/runners/TransformHierarchyTest.java     |  23 +++-
 .../apache/beam/sdk/transforms/ParDoTest.java   |  34 ++++++
 .../beam/sdk/values/PCollectionListTest.java    | 117 +++++++++++++++++++
 .../beam/sdk/values/PCollectionTupleTest.java   |  70 +++++++++++
 20 files changed, 449 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34373c21/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
index 0283d03..425bbf1 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
@@ -35,6 +35,7 @@ import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TaggedPValue;
 
 /**
  * Tracks the {@link AppliedPTransform AppliedPTransforms} that consume each {@link PValue} in the
@@ -79,14 +80,16 @@ class DirectGraphVisitor extends PipelineVisitor.Defaults {
 
   @Override
   public void visitPrimitiveTransform(TransformHierarchy.Node node) {
-    toFinalize.removeAll(node.getInputs());
+    for (TaggedPValue consumed : node.getInputs()) {
+      toFinalize.remove(consumed.getValue());
+    }
     AppliedPTransform<?, ?, ?> appliedTransform = getAppliedTransform(node);
     stepNames.put(appliedTransform, genStepName());
     if (node.getInputs().isEmpty()) {
       rootTransforms.add(appliedTransform);
     } else {
-      for (PValue value : node.getInputs()) {
-        primitiveConsumers.put(value, appliedTransform);
+      for (TaggedPValue value : node.getInputs()) {
+        primitiveConsumers.put(value.getValue(), appliedTransform);
       }
     }
   }
@@ -96,15 +99,12 @@ class DirectGraphVisitor extends PipelineVisitor.Defaults {
     toFinalize.add(value);
 
     AppliedPTransform<?, ?, ?> appliedTransform = getAppliedTransform(producer);
+    if (value instanceof PCollectionView) {
+      views.add((PCollectionView<?>) value);
+    }
     if (!producers.containsKey(value)) {
       producers.put(value, appliedTransform);
     }
-   if (value instanceof PCollectionView) {
-     views.add((PCollectionView<?>) value);
-   }
-   if (!producers.containsKey(value)) {
-     producers.put(value, appliedTransform);
-   }
   }
 
   private AppliedPTransform<?, ?, ?> getAppliedTransform(TransformHierarchy.Node node) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34373c21/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
index cb9ddd8..bbcab8e 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
@@ -53,6 +53,7 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollection.IsBounded;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TaggedPValue;
 import org.joda.time.Instant;
 
 /**
@@ -419,9 +420,9 @@ class EvaluationContext {
     }
     // If the PTransform has any unbounded outputs, and unbounded producers should not be shut down,
     // the PTransform may produce additional output. It is not done.
-    for (PValue output : transform.getOutput().expand()) {
-      if (output instanceof PCollection) {
-        IsBounded bounded = ((PCollection<?>) output).isBounded();
+    for (TaggedPValue output : transform.getOutput().expand()) {
+      if (output.getValue() instanceof PCollection) {
+        IsBounded bounded = ((PCollection<?>) output.getValue()).isBounded();
         if (bounded.equals(IsBounded.UNBOUNDED)
             && !options.isShutdownUnboundedProducersWithMaxWatermark()) {
           return false;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34373c21/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java
index 65c41e0..32eb692 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java
@@ -18,11 +18,10 @@
 package org.apache.beam.runners.direct;
 
 import static com.google.common.base.Preconditions.checkState;
-import static com.google.common.base.Predicates.in;
-import static com.google.common.collect.Iterables.all;
 
 import com.google.common.collect.ImmutableSet;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 import org.apache.beam.runners.core.SplittableParDo;
 import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupAlsoByWindow;
@@ -33,6 +32,7 @@ import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TaggedPValue;
 
 /**
  * A pipeline visitor that tracks all keyed {@link PValue PValues}. A {@link PValue} is keyed if it
@@ -83,7 +83,10 @@ class KeyedPValueTrackingVisitor implements PipelineVisitor {
     if (node.isRootNode()) {
       finalized = true;
     } else if (PRODUCES_KEYED_OUTPUTS.contains(node.getTransform().getClass())) {
-      keyedValues.addAll(node.getOutputs());
+      List<TaggedPValue> outputs = node.getOutputs();
+      for (TaggedPValue output : outputs) {
+        keyedValues.add(output.getValue());
+      }
     }
   }
 
@@ -92,9 +95,12 @@ class KeyedPValueTrackingVisitor implements PipelineVisitor {
 
   @Override
   public void visitValue(PValue value, TransformHierarchy.Node producer) {
+    boolean inputsAreKeyed = true;
+    for (TaggedPValue input : producer.getInputs()) {
+      inputsAreKeyed = inputsAreKeyed && keyedValues.contains(input.getValue());
+    }
     if (PRODUCES_KEYED_OUTPUTS.contains(producer.getTransform().getClass())
-        || (isKeyPreserving(producer.getTransform())
-            && all(producer.getInputs(), in(keyedValues)))) {
+        || (isKeyPreserving(producer.getTransform()) && inputsAreKeyed)) {
       keyedValues.add(value);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34373c21/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
index 247b1cc..7bed751 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
@@ -57,7 +57,7 @@ import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.TimerInternals.TimerData;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TaggedPValue;
 import org.joda.time.Instant;
 
 /**
@@ -755,13 +755,14 @@ public class WatermarkManager {
 
   private Collection<Watermark> getInputProcessingWatermarks(AppliedPTransform<?, ?, ?> transform) {
     ImmutableList.Builder<Watermark> inputWmsBuilder = ImmutableList.builder();
-    Collection<? extends PValue> inputs = transform.getInput().expand();
+    List<TaggedPValue> inputs = transform.getInput().expand();
     if (inputs.isEmpty()) {
       inputWmsBuilder.add(THE_END_OF_TIME);
     }
-    for (PValue pvalue : inputs) {
+    for (TaggedPValue pvalue : inputs) {
       Watermark producerOutputWatermark =
-          getTransformWatermark(graph.getProducer(pvalue)).synchronizedProcessingOutputWatermark;
+          getTransformWatermark(graph.getProducer(pvalue.getValue()))
+              .synchronizedProcessingOutputWatermark;
       inputWmsBuilder.add(producerOutputWatermark);
     }
     return inputWmsBuilder.build();
@@ -769,13 +770,13 @@ public class WatermarkManager {
 
   private List<Watermark> getInputWatermarks(AppliedPTransform<?, ?, ?> transform) {
     ImmutableList.Builder<Watermark> inputWatermarksBuilder = ImmutableList.builder();
-    Collection<? extends PValue> inputs = transform.getInput().expand();
+    List<TaggedPValue> inputs = transform.getInput().expand();
     if (inputs.isEmpty()) {
       inputWatermarksBuilder.add(THE_END_OF_TIME);
     }
-    for (PValue pvalue : inputs) {
+    for (TaggedPValue pvalue : inputs) {
       Watermark producerOutputWatermark =
-          getTransformWatermark(graph.getProducer(pvalue)).outputWatermark;
+          getTransformWatermark(graph.getProducer(pvalue.getValue())).outputWatermark;
       inputWatermarksBuilder.add(producerOutputWatermark);
     }
     List<Watermark> inputCollectionWatermarks = inputWatermarksBuilder.build();
@@ -959,8 +960,8 @@ public class WatermarkManager {
     WatermarkUpdate updateResult = myWatermarks.refresh();
     if (updateResult.isAdvanced()) {
       Set<AppliedPTransform<?, ?, ?>> additionalRefreshes = new HashSet<>();
-      for (PValue outputPValue : toRefresh.getOutput().expand()) {
-        additionalRefreshes.addAll(graph.getPrimitiveConsumers(outputPValue));
+      for (TaggedPValue outputPValue : toRefresh.getOutput().expand()) {
+        additionalRefreshes.addAll(graph.getPrimitiveConsumers(outputPValue.getValue()));
       }
       return additionalRefreshes;
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34373c21/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
index 3d98b87..92c07bb 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
@@ -48,6 +48,7 @@ import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TaggedPValue;
 import org.apache.spark.Accumulator;
 import org.apache.spark.SparkEnv$;
 import org.apache.spark.api.java.JavaSparkContext;
@@ -282,7 +283,7 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
       if (node.getInputs().size() != 1) {
         return false;
       }
-      PValue input = Iterables.getOnlyElement(node.getInputs());
+      PValue input = Iterables.getOnlyElement(node.getInputs()).getValue();
       if (!(input instanceof PCollection)
           || ((PCollection) input).getWindowingStrategy().getWindowFn().isNonMerging()) {
         return false;
@@ -338,7 +339,7 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
       //--- determine if node is bounded/unbounded.
       // usually, the input determines if the PCollection to apply the next transformation to
       // is BOUNDED or UNBOUNDED, meaning RDD/DStream.
-      Collection<? extends PValue> pValues;
+      Collection<TaggedPValue> pValues;
       if (node.getInputs().isEmpty()) {
         // in case of a PBegin, it's the output.
         pValues = node.getOutputs();
@@ -353,15 +354,15 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
               : translator.translateUnbounded(transformClass);
     }
 
-    private PCollection.IsBounded isBoundedCollection(Collection<? extends PValue> pValues) {
+    private PCollection.IsBounded isBoundedCollection(Collection<TaggedPValue> pValues) {
       // anything that is not a PCollection, is BOUNDED.
       // For PCollections:
       // BOUNDED behaves as the Identity Element, BOUNDED + BOUNDED = BOUNDED
       // while BOUNDED + UNBOUNDED = UNBOUNDED.
       PCollection.IsBounded isBounded = PCollection.IsBounded.BOUNDED;
-      for (PValue pValue: pValues) {
-        if (pValue instanceof PCollection) {
-          isBounded = isBounded.and(((PCollection) pValue).isBounded());
+      for (TaggedPValue pValue: pValues) {
+        if (pValue.getValue() instanceof PCollection) {
+          isBounded = isBounded.and(((PCollection) pValue.getValue()).isBounded());
         } else {
           isBounded = isBounded.and(PCollection.IsBounded.BOUNDED);
         }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34373c21/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
index 33d5231..29e7fcb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
@@ -37,6 +37,7 @@ import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TaggedPValue;
 
 /**
  * Captures information about a collection of transformations and their
@@ -84,10 +85,12 @@ public class TransformHierarchy {
    */
   public void finishSpecifyingInput() {
     // Inputs must be completely specified before they are consumed by a transform.
-    for (PValue inputValue : current.getInputs()) {
-      inputValue.finishSpecifying();
-      checkState(producers.get(inputValue) != null, "Producer unknown for input %s", inputValue);
-      inputValue.finishSpecifying();
+    for (TaggedPValue inputValue : current.getInputs()) {
+      inputValue.getValue().finishSpecifying();
+      checkState(
+          producers.get(inputValue.getValue()) != null,
+          "Producer unknown for input %s",
+          inputValue.getValue());
     }
   }
 
@@ -103,9 +106,9 @@ public class TransformHierarchy {
    */
   public void setOutput(POutput output) {
     output.finishSpecifyingOutput();
-    for (PValue value : output.expand()) {
-      if (!producers.containsKey(value)) {
-        producers.put(value, current);
+    for (TaggedPValue value : output.expand()) {
+      if (!producers.containsKey(value.getValue())) {
+        producers.put(value.getValue(), current);
       }
     }
     current.setOutput(output);
@@ -133,8 +136,8 @@ public class TransformHierarchy {
    */
   List<Node> getProducingTransforms(POutput output) {
     List<Node> producingTransforms = new ArrayList<>();
-    for (PValue value : output.expand()) {
-      Node producer = getProducer(value);
+    for (TaggedPValue value : output.expand()) {
+      Node producer = getProducer(value.getValue());
       if (producer != null) {
         producingTransforms.add(producer);
       }
@@ -238,8 +241,8 @@ public class TransformHierarchy {
     private boolean returnsOthersOutput() {
       PTransform<?, ?> transform = getTransform();
       if (output != null) {
-        for (PValue outputValue : output.expand()) {
-          if (!getProducer(outputValue).getTransform().equals(transform)) {
+        for (TaggedPValue outputValue : output.expand()) {
+          if (!getProducer(outputValue.getValue()).getTransform().equals(transform)) {
             return true;
           }
         }
@@ -256,8 +259,8 @@ public class TransformHierarchy {
     }
 
     /** Returns the transform input, in unexpanded form. */
-    public Collection<? extends PValue> getInputs() {
-      return input == null ? Collections.<PValue>emptyList() : input.expand();
+    public List<TaggedPValue> getInputs() {
+      return input == null ? Collections.<TaggedPValue>emptyList() : input.expand();
     }
 
     /**
@@ -273,8 +276,8 @@ public class TransformHierarchy {
       // Validate that a primitive transform produces only primitive output, and a composite
       // transform does not produce primitive output.
       Set<Node> outputProducers = new HashSet<>();
-      for (PValue outputValue : output.expand()) {
-        outputProducers.add(getProducer(outputValue));
+      for (TaggedPValue outputValue : output.expand()) {
+        outputProducers.add(getProducer(outputValue.getValue()));
       }
       if (outputProducers.contains(this) && outputProducers.size() != 1) {
         Set<String> otherProducerNames = new HashSet<>();
@@ -296,8 +299,8 @@ public class TransformHierarchy {
     }
 
     /** Returns the transform output, in unexpanded form. */
-    public Collection<? extends PValue> getOutputs() {
-      return output == null ? Collections.<PValue>emptyList() : output.expand();
+    public List<TaggedPValue> getOutputs() {
+      return output == null ? Collections.<TaggedPValue>emptyList() : output.expand();
     }
 
     /**
@@ -320,9 +323,9 @@ public class TransformHierarchy {
 
       if (!isRootNode()) {
         // Visit inputs.
-        for (PValue inputValue : input.expand()) {
-          if (visitedValues.add(inputValue)) {
-            visitor.visitValue(inputValue, getProducer(inputValue));
+        for (TaggedPValue inputValue : input.expand()) {
+          if (visitedValues.add(inputValue.getValue())) {
+            visitor.visitValue(inputValue.getValue(), getProducer(inputValue.getValue()));
           }
         }
       }
@@ -342,9 +345,9 @@ public class TransformHierarchy {
 
       if (!isRootNode()) {
         // Visit outputs.
-        for (PValue pValue : output.expand()) {
-          if (visitedValues.add(pValue)) {
-            visitor.visitValue(pValue, this);
+        for (TaggedPValue pValue : output.expand()) {
+          if (visitedValues.add(pValue.getValue())) {
+            visitor.visitValue(pValue.getValue(), this);
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34373c21/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/KeyedPCollectionTuple.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/KeyedPCollectionTuple.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/KeyedPCollectionTuple.java
index 67b819f..13d4ee1 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/KeyedPCollectionTuple.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/KeyedPCollectionTuple.java
@@ -18,7 +18,6 @@
 package org.apache.beam.sdk.transforms.join;
 
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.List;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
@@ -28,7 +27,7 @@ import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
-import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TaggedPValue;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
 
@@ -121,10 +120,10 @@ public class KeyedPCollectionTuple<K> implements PInput {
    * any tag-specific information.
    */
   @Override
-  public Collection<? extends PValue> expand() {
-    List<PCollection<?>> retval = new ArrayList<>();
+  public List<TaggedPValue> expand() {
+    List<TaggedPValue> retval = new ArrayList<>();
     for (TaggedKeyedPCollection<K, ?> taggedPCollection : keyedCollections) {
-      retval.add(taggedPCollection.pCollection);
+      retval.add(TaggedPValue.of(taggedPCollection.tupleTag, taggedPCollection.pCollection));
     }
     return retval;
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34373c21/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PBegin.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PBegin.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PBegin.java
index f1dbb37..9aa4615 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PBegin.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PBegin.java
@@ -17,8 +17,8 @@
  */
 package org.apache.beam.sdk.values;
 
-import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.TextIO.Read;
 import org.apache.beam.sdk.transforms.Create;
@@ -64,7 +64,7 @@ public class PBegin implements PInput {
   }
 
   @Override
-  public Collection<? extends PValue> expand() {
+  public List<TaggedPValue> expand() {
     // A PBegin contains no PValues.
     return Collections.emptyList();
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34373c21/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionList.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionList.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionList.java
index 4c9e220..e4bb7c5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionList.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionList.java
@@ -18,11 +18,10 @@
 package org.apache.beam.sdk.values;
 
 import com.google.common.collect.ImmutableList;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
+import com.google.common.collect.Iterables;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Objects;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Flatten;
@@ -115,9 +114,9 @@ public class PCollectionList<T> implements PInput, POutput {
           "PCollections come from different Pipelines");
     }
     return new PCollectionList<>(pipeline,
-        new ImmutableList.Builder<PCollection<T>>()
+        ImmutableList.<TaggedPValue>builder()
             .addAll(pcollections)
-            .add(pc)
+            .add(Iterables.getOnlyElement(pc.expand()))
             .build());
   }
 
@@ -130,15 +129,16 @@ public class PCollectionList<T> implements PInput, POutput {
    * part of the same {@link Pipeline}.
    */
   public PCollectionList<T> and(Iterable<PCollection<T>> pcs) {
-    List<PCollection<T>> copy = new ArrayList<>(pcollections);
+    ImmutableList.Builder<TaggedPValue> builder = ImmutableList.builder();
+    builder.addAll(pcollections);
     for (PCollection<T> pc : pcs) {
       if (pc.getPipeline() != pipeline) {
         throw new IllegalArgumentException(
             "PCollections come from different Pipelines");
       }
-      copy.add(pc);
+      builder.add(Iterables.getOnlyElement(pc.expand()));
     }
-    return new PCollectionList<>(pipeline, copy);
+    return new PCollectionList<>(pipeline, builder.build());
   }
 
   /**
@@ -155,7 +155,9 @@ public class PCollectionList<T> implements PInput, POutput {
    * {@code [0..size()-1]}.
    */
   public PCollection<T> get(int index) {
-    return pcollections.get(index);
+    @SuppressWarnings("unchecked") // Type-safe by construction
+    PCollection<T> value = (PCollection<T>) pcollections.get(index).getValue();
+    return value;
   }
 
   /**
@@ -163,7 +165,13 @@ public class PCollectionList<T> implements PInput, POutput {
    * {@link PCollectionList}.
    */
   public List<PCollection<T>> getAll() {
-    return pcollections;
+    ImmutableList.Builder<PCollection<T>> res = ImmutableList.builder();
+    for (TaggedPValue value : pcollections) {
+      @SuppressWarnings("unchecked") // Type-safe by construction
+      PCollection<T> typedValue = (PCollection<T>) value.getValue();
+      res.add(typedValue);
+    }
+    return res.build();
   }
 
   /**
@@ -192,15 +200,16 @@ public class PCollectionList<T> implements PInput, POutput {
   // Internal details below here.
 
   final Pipeline pipeline;
-  final List<PCollection<T>> pcollections;
+  // ImmutableMap has a defined iteration order.
+  final List<TaggedPValue> pcollections;
 
   PCollectionList(Pipeline pipeline) {
-    this(pipeline, new ArrayList<PCollection<T>>());
+    this(pipeline, ImmutableList.<TaggedPValue>of());
   }
 
-  PCollectionList(Pipeline pipeline, List<PCollection<T>> pcollections) {
+  PCollectionList(Pipeline pipeline, List<TaggedPValue> values) {
     this.pipeline = pipeline;
-    this.pcollections = Collections.unmodifiableList(pcollections);
+    this.pcollections = ImmutableList.copyOf(values);
   }
 
   @Override
@@ -209,14 +218,16 @@ public class PCollectionList<T> implements PInput, POutput {
   }
 
   @Override
-  public Collection<? extends PValue> expand() {
+  public List<TaggedPValue> expand() {
     return pcollections;
   }
 
   @Override
   public void recordAsOutput(AppliedPTransform<?, ?, ?> transform) {
     int i = 0;
-    for (PCollection<T> pc : pcollections) {
+    for (TaggedPValue tpv : pcollections) {
+      @SuppressWarnings("unchecked")
+      PCollection<T> pc = (PCollection<T>) tpv.getValue();
       pc.recordAsOutput(transform, "out" + i);
       i++;
     }
@@ -224,15 +235,29 @@ public class PCollectionList<T> implements PInput, POutput {
 
   @Override
   public void finishSpecifying() {
-    for (PCollection<T> pc : pcollections) {
-      pc.finishSpecifying();
+    for (TaggedPValue pc : pcollections) {
+      pc.getValue().finishSpecifying();
     }
   }
 
   @Override
   public void finishSpecifyingOutput() {
-    for (PCollection<T> pc : pcollections) {
-      pc.finishSpecifyingOutput();
+    for (TaggedPValue pc : pcollections) {
+      pc.getValue().finishSpecifyingOutput();
     }
   }
+
+  @Override
+  public boolean equals(Object other) {
+    if (!(other instanceof PCollectionList)) {
+      return false;
+    }
+    PCollectionList that = (PCollectionList) other;
+    return this.pipeline.equals(that.pipeline) && this.pcollections.equals(that.pcollections);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(this.pipeline, this.pcollections);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34373c21/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionTuple.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionTuple.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionTuple.java
index 727d882..6afe59e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionTuple.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionTuple.java
@@ -17,11 +17,13 @@
  */
 package org.apache.beam.sdk.values;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -178,7 +180,7 @@ public class PCollectionTuple implements PInput, POutput {
   /////////////////////////////////////////////////////////////////////////////
   // Internal details below here.
 
-  Pipeline pipeline;
+  final Pipeline pipeline;
   final Map<TupleTag<?>, PCollection<?>> pcollectionMap;
 
   PCollectionTuple(Pipeline pipeline) {
@@ -232,8 +234,12 @@ public class PCollectionTuple implements PInput, POutput {
   }
 
   @Override
-  public Collection<? extends PValue> expand() {
-    return pcollectionMap.values();
+  public List<TaggedPValue> expand() {
+    ImmutableList.Builder<TaggedPValue> values = ImmutableList.builder();
+    for (Map.Entry<TupleTag<?>, PCollection<?>> entry : pcollectionMap.entrySet()) {
+      values.add(TaggedPValue.of(entry.getKey(), entry.getValue()));
+    }
+    return values.build();
   }
 
   @Override
@@ -261,4 +267,18 @@ public class PCollectionTuple implements PInput, POutput {
       pc.finishSpecifyingOutput();
     }
   }
+
+  @Override
+  public boolean equals(Object other) {
+    if (!(other instanceof PCollectionTuple)) {
+      return false;
+    }
+    PCollectionTuple that = (PCollectionTuple) other;
+    return this.pipeline.equals(that.pipeline) && this.pcollectionMap.equals(that.pcollectionMap);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(this.pipeline, this.pcollectionMap);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34373c21/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PDone.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PDone.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PDone.java
index 9e8cae4..b4a3025 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PDone.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PDone.java
@@ -17,8 +17,8 @@
  */
 package org.apache.beam.sdk.values;
 
-import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.transforms.PTransform;
 
@@ -36,7 +36,7 @@ public class PDone extends POutputValueBase {
   }
 
   @Override
-  public Collection<? extends PValue> expand() {
+  public List<TaggedPValue> expand() {
     // A PDone contains no PValues.
     return Collections.emptyList();
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34373c21/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PInput.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PInput.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PInput.java
index f938aeb..a27b939 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PInput.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PInput.java
@@ -17,7 +17,7 @@
  */
 package org.apache.beam.sdk.values;
 
-import java.util.Collection;
+import java.util.List;
 import org.apache.beam.sdk.Pipeline;
 
 /**
@@ -43,7 +43,7 @@ public interface PInput {
    *
    * <p>Not intended to be invoked directly by user code.
    */
-  Collection<? extends PValue> expand();
+  List<TaggedPValue> expand();
 
   /**
    * After building, finalizes this {@code PInput} to make it ready for

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34373c21/sdks/java/core/src/main/java/org/apache/beam/sdk/values/POutput.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/POutput.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/POutput.java
index 27a280f..e5d4504 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/POutput.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/POutput.java
@@ -17,7 +17,7 @@
  */
 package org.apache.beam.sdk.values;
 
-import java.util.Collection;
+import java.util.List;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -45,7 +45,7 @@ public interface POutput {
    *
    * <p>Not intended to be invoked directly by user code.
    */
-  Collection<? extends PValue> expand();
+  List<TaggedPValue> expand();
 
   /**
    * Records that this {@code POutput} is an output of the given

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34373c21/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValue.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValue.java
index 0cee2ca..e6dbaf7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValue.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValue.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.values;
 
+import java.util.List;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
 
@@ -36,4 +37,13 @@ public interface PValue extends POutput, PInput {
    * <p>For internal use only.
    */
   AppliedPTransform<?, ?, ?> getProducingTransformInternal();
+
+  /**
+   * {@inheritDoc}.
+   *
+   * <p>A {@link PValue} always expands into itself. Calling {@link #expand()} on a PValue is almost
+   * never appropriate.
+   */
+  @Deprecated
+  List<TaggedPValue> expand();
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34373c21/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java
index 685e32f..3a10d5d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java
@@ -17,8 +17,8 @@
  */
 package org.apache.beam.sdk.values;
 
-import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -91,6 +91,11 @@ public abstract class PValueBase extends POutputValueBase implements PValue {
   private String name;
 
   /**
+   * A local {@link TupleTag} used in the expansion of this {@link PValueBase}.
+   */
+  private TupleTag<?> tag = new TupleTag<>();
+
+  /**
    * Whether this {@link PValueBase} has been finalized, and its core
    * properties, e.g., name, can no longer be changed.
    */
@@ -128,8 +133,8 @@ public abstract class PValueBase extends POutputValueBase implements PValue {
   }
 
   @Override
-  public Collection<? extends PValue> expand() {
-    return Collections.singletonList(this);
+  public final List<TaggedPValue> expand() {
+    return Collections.singletonList(TaggedPValue.of(tag, this));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34373c21/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TaggedPValue.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TaggedPValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TaggedPValue.java
new file mode 100644
index 0000000..458d16f
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TaggedPValue.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.beam.sdk.values;
+
+import com.google.auto.value.AutoValue;
+
+/**
+ * A (TupleTag, PValue) pair used in the expansion of a {@link PInput} or {@link POutput}.
+ */
+@AutoValue
+public abstract class TaggedPValue {
+  public static TaggedPValue of(TupleTag<?> tag, PValue value) {
+    return new AutoValue_TaggedPValue(tag, value);
+  }
+
+  /**
+   * Returns the local tag associated with the {@link PValue}.
+   */
+  public abstract TupleTag<?> getTag();
+
+  /**
+   * Returns the {@link PValue}.
+   */
+  public abstract PValue getValue();
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34373c21/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
index 2327459..d790d39 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
@@ -22,7 +22,10 @@ import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertThat;
 
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 import org.apache.beam.sdk.Pipeline.PipelineVisitor;
 import org.apache.beam.sdk.io.CountingSource;
@@ -38,6 +41,7 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollection.IsBounded;
 import org.apache.beam.sdk.values.PCollectionList;
 import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TaggedPValue;
 import org.hamcrest.Matchers;
 import org.junit.Before;
 import org.junit.Rule;
@@ -181,14 +185,16 @@ public class TransformHierarchyTest {
     assertThat(hierarchy.getCurrent(), equalTo(primitiveNode));
     hierarchy.setOutput(created);
     hierarchy.popNode();
-    assertThat(primitiveNode.getOutputs(), Matchers.<PValue>containsInAnyOrder(created));
-    assertThat(primitiveNode.getInputs(), Matchers.<PValue>emptyIterable());
+    assertThat(
+        fromTaggedValues(primitiveNode.getOutputs()), Matchers.<PValue>containsInAnyOrder(created));
+    assertThat(primitiveNode.getInputs(), Matchers.<TaggedPValue>emptyIterable());
     assertThat(primitiveNode.getTransform(), Matchers.<PTransform<?, ?>>equalTo(read));
     assertThat(primitiveNode.getEnclosingNode(), equalTo(compositeNode));
 
     hierarchy.setOutput(created);
     // The composite is listed as outputting a PValue created by the contained primitive
-    assertThat(compositeNode.getOutputs(), Matchers.<PValue>containsInAnyOrder(created));
+    assertThat(
+        fromTaggedValues(compositeNode.getOutputs()), Matchers.<PValue>containsInAnyOrder(created));
     // The producer of that PValue is still the primitive in which it is first output
     assertThat(hierarchy.getProducer(created), equalTo(primitiveNode));
     hierarchy.popNode();
@@ -226,4 +232,15 @@ public class TransformHierarchyTest {
     assertThat(visitedValuesInVisitor, Matchers.<PValue>containsInAnyOrder(created, mapped));
     assertThat(visitedValuesInVisitor, equalTo(visitedValues));
   }
+
+  private static List<PValue> fromTaggedValues(List<TaggedPValue> taggedValues) {
+    return Lists.transform(
+        taggedValues,
+        new Function<TaggedPValue, PValue>() {
+          @Override
+          public PValue apply(TaggedPValue input) {
+            return input.getValue();
+          }
+        });
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34373c21/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 3a47fc7..fa8874c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -29,6 +29,7 @@ import static org.hamcrest.Matchers.allOf;
 import static org.hamcrest.Matchers.anyOf;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.not;
 import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
 import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
 import static org.junit.Assert.assertArrayEquals;
@@ -50,6 +51,7 @@ import java.util.List;
 import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.io.CountingInput;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.RunnableOnService;
@@ -86,6 +88,7 @@ import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TimestampedValue;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
+import org.hamcrest.Matchers;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.joda.time.MutableDateTime;
@@ -864,6 +867,37 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
+  public void testMultiOutputAppliedMultipleTimesDifferentOutputs() {
+    pipeline.enableAbandonedNodeEnforcement(false);
+    PCollection<Long> longs = pipeline.apply(CountingInput.unbounded());
+
+    TupleTag<Long> mainOut = new TupleTag<>();
+    final TupleTag<String> sideOutOne = new TupleTag<>();
+    final TupleTag<Integer> sideOutTwo = new TupleTag<>();
+    DoFn<Long, Long> fn =
+        new DoFn<Long, Long>() {
+          @ProcessElement
+          public void processElement(ProcessContext cxt) {
+            cxt.output(cxt.element());
+            cxt.sideOutput(sideOutOne, Long.toString(cxt.element()));
+            cxt.sideOutput(sideOutTwo, Long.valueOf(cxt.element()).intValue());
+          }
+        };
+
+    ParDo.BoundMulti<Long, Long> parDo =
+        ParDo.of(fn).withOutputTags(mainOut, TupleTagList.of(sideOutOne).and(sideOutTwo));
+    PCollectionTuple firstApplication = longs.apply("first", parDo);
+    PCollectionTuple secondApplication = longs.apply("second", parDo);
+    assertThat(firstApplication, not(equalTo(secondApplication)));
+    assertThat(
+        firstApplication.getAll().keySet(),
+        Matchers.<TupleTag<?>>containsInAnyOrder(mainOut, sideOutOne, sideOutTwo));
+    assertThat(
+        secondApplication.getAll().keySet(),
+        Matchers.<TupleTag<?>>containsInAnyOrder(mainOut, sideOutOne, sideOutTwo));
+  }
+
+  @Test
   @Category(RunnableOnService.class)
   public void testParDoInCustomTransform() {
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34373c21/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionListTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionListTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionListTest.java
index f76bf7e..2482f32 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionListTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionListTest.java
@@ -18,10 +18,22 @@
 package org.apache.beam.sdk.values;
 
 import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 
+import com.google.common.collect.ImmutableList;
+import com.google.common.testing.EqualsTester;
 import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.CountingInput;
+import org.apache.beam.sdk.io.CountingInput.BoundedCountingInput;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.hamcrest.Matchers;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -44,4 +56,109 @@ public class PCollectionListTest {
               + "or must first call empty(Pipeline)"));
     }
   }
+
+  @Test
+  public void testIterationOrder() {
+    Pipeline p = TestPipeline.create();
+    PCollection<Long> createOne = p.apply("CreateOne", Create.of(1L, 2L, 3L));
+    PCollection<Long> boundedCount = p.apply("CountBounded", CountingInput.upTo(23L));
+    PCollection<Long> unboundedCount = p.apply("CountUnbounded", CountingInput.unbounded());
+    PCollection<Long> createTwo = p.apply("CreateTwo", Create.of(-1L, -2L));
+    PCollection<Long> maxRecordsCount =
+        p.apply("CountLimited", CountingInput.unbounded().withMaxNumRecords(22L));
+
+    ImmutableList<PCollection<Long>> counts =
+        ImmutableList.of(boundedCount, maxRecordsCount, unboundedCount);
+    // Build a PCollectionList from a list. This should have the same order as the input list.
+    PCollectionList<Long> pcList = PCollectionList.of(counts);
+    // Contains is the order-dependent matcher
+    assertThat(
+        pcList.getAll(),
+        contains(boundedCount, maxRecordsCount, unboundedCount));
+
+    // A list that is expanded with builder methods has the added value at the end
+    PCollectionList<Long> withOneCreate = pcList.and(createTwo);
+    assertThat(
+        withOneCreate.getAll(), contains(boundedCount, maxRecordsCount, unboundedCount, createTwo));
+
+    // Lists that are built entirely from the builder return outputs in the order they were added
+    PCollectionList<Long> fromEmpty =
+        PCollectionList.<Long>empty(p)
+            .and(unboundedCount)
+            .and(createOne)
+            .and(ImmutableList.of(boundedCount, maxRecordsCount));
+    assertThat(
+        fromEmpty.getAll(), contains(unboundedCount, createOne, boundedCount, maxRecordsCount));
+
+    List<TaggedPValue> expansion = fromEmpty.expand();
+    // TaggedPValues are stable between expansions
+    assertThat(expansion, equalTo(fromEmpty.expand()));
+    // TaggedPValues are equivalent between equivalent lists
+    assertThat(
+        expansion,
+        equalTo(
+            PCollectionList.of(unboundedCount)
+                .and(createOne)
+                .and(boundedCount)
+                .and(maxRecordsCount)
+                .expand()));
+
+    List<PCollection<Long>> expectedList =
+        ImmutableList.of(unboundedCount, createOne, boundedCount, maxRecordsCount);
+    for (int i = 0; i < expansion.size(); i++) {
+      assertThat(
+          "Index " + i + " should have equal PValue",
+          expansion.get(i).getValue(),
+          Matchers.<PValue>equalTo(expectedList.get(i)));
+    }
+  }
+
+  @Test
+  public void testEquals() {
+    Pipeline p = TestPipeline.create();
+    PCollection<String> first = p.apply("Meta", Create.of("foo", "bar"));
+    PCollection<String> second = p.apply("Pythonic", Create.of("spam, ham"));
+    PCollection<String> third = p.apply("Syntactic", Create.of("eggs", "baz"));
+
+    EqualsTester tester = new EqualsTester();
+    tester.addEqualityGroup(PCollectionList.empty(p), PCollectionList.empty(p));
+    tester.addEqualityGroup(PCollectionList.of(first).and(second));
+    // Constructors should all produce equivalent
+    tester.addEqualityGroup(
+        PCollectionList.of(first).and(second).and(third),
+        PCollectionList.of(first).and(second).and(third),
+        PCollectionList.<String>empty(p).and(first).and(second).and(third),
+        PCollectionList.of(ImmutableList.of(first, second, third)),
+        PCollectionList.of(first).and(ImmutableList.of(second, third)),
+        PCollectionList.of(ImmutableList.of(first, second)).and(third));
+    // Order is considered
+    tester.addEqualityGroup(PCollectionList.of(first).and(third).and(second));
+    tester.addEqualityGroup(PCollectionList.empty(TestPipeline.create()));
+
+    tester.testEquals();
+  }
+
+  @Test
+  public void testExpansionOrderWithDuplicates() {
+    TestPipeline p = TestPipeline.create();
+    BoundedCountingInput count = CountingInput.upTo(10L);
+    PCollection<Long> firstCount = p.apply("CountFirst", count);
+    PCollection<Long> secondCount = p.apply("CountSecond", count);
+
+    PCollectionList<Long> counts =
+        PCollectionList.of(firstCount).and(secondCount).and(firstCount).and(firstCount);
+
+    ImmutableList<PCollection<Long>> expectedOrder =
+        ImmutableList.of(firstCount, secondCount, firstCount, firstCount);
+    PCollectionList<Long> reconstructed = PCollectionList.empty(p);
+    assertThat(counts.expand(), hasSize(4));
+    for (int i = 0; i < 4; i++) {
+      PValue value = counts.expand().get(i).getValue();
+      assertThat(
+          "Index " + i + " should be equal", value,
+          Matchers.<PValue>equalTo(expectedOrder.get(i)));
+      reconstructed = reconstructed.and((PCollection<Long>) value);
+    }
+    assertThat(reconstructed, equalTo(counts));
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34373c21/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java
index b5351da..7d767cf 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java
@@ -17,21 +17,31 @@
  */
 package org.apache.beam.sdk.values;
 
+import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
+import com.google.common.collect.ImmutableMap;
+import com.google.common.testing.EqualsTester;
 import java.io.Serializable;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.CountingInput;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.PCollection.IsBounded;
+import org.hamcrest.Matchers;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -93,4 +103,64 @@ public final class PCollectionTupleTest implements Serializable {
     pipeline.run();
   }
 
+  @Test
+  public void testEquals() {
+    TestPipeline p = TestPipeline.create();
+    TupleTag<Long> longTag = new TupleTag<>();
+    PCollection<Long> longs = p.apply(CountingInput.unbounded());
+    TupleTag<String> strTag = new TupleTag<>();
+    PCollection<String> strs = p.apply(Create.of("foo", "bar"));
+
+    EqualsTester tester = new EqualsTester();
+    // Empty tuples in the same pipeline are equal
+    tester.addEqualityGroup(PCollectionTuple.empty(p), PCollectionTuple.empty(p));
+
+    tester.addEqualityGroup(PCollectionTuple.of(longTag, longs).and(strTag, strs),
+        PCollectionTuple.of(longTag, longs).and(strTag, strs));
+
+    tester.addEqualityGroup(PCollectionTuple.of(longTag, longs));
+    tester.addEqualityGroup(PCollectionTuple.of(strTag, strs));
+
+    TestPipeline otherPipeline = TestPipeline.create();
+    // Empty tuples in different pipelines are not equal
+    tester.addEqualityGroup(PCollectionTuple.empty(otherPipeline));
+    tester.testEquals();
+  }
+
+  @Test
+  public void testExpandHasMatchingTags() {
+    TupleTag<Integer> intTag = new TupleTag<>();
+    TupleTag<String> strTag = new TupleTag<>();
+    TupleTag<Long> longTag = new TupleTag<>();
+
+    Pipeline p = TestPipeline.create();
+    PCollection<Long> longs = p.apply(CountingInput.upTo(100L));
+    PCollection<String> strs = p.apply(Create.of("foo", "bar", "baz"));
+    PCollection<Integer> ints = longs.apply(MapElements.via(new SimpleFunction<Long, Integer>() {
+      @Override
+      public Integer apply(Long input) {
+        return input.intValue();
+      }
+    }));
+
+    Map<TupleTag<?>, PCollection<?>> pcsByTag =
+        ImmutableMap.<TupleTag<?>, PCollection<?>>builder()
+            .put(strTag, strs)
+            .put(intTag, ints)
+            .put(longTag, longs)
+            .build();
+    PCollectionTuple tuple =
+        PCollectionTuple.of(intTag, ints).and(longTag, longs).and(strTag, strs);
+    assertThat(tuple.getAll(), equalTo(pcsByTag));
+    PCollectionTuple reconstructed = PCollectionTuple.empty(p);
+    for (TaggedPValue taggedValue : tuple.expand()) {
+      TupleTag<?> tag = taggedValue.getTag();
+      PValue value = taggedValue.getValue();
+      assertThat("The tag should map back to the value", tuple.get(tag), equalTo(value));
+      assertThat(value, Matchers.<PValue>equalTo(pcsByTag.get(tag)));
+      reconstructed = reconstructed.and(tag, (PCollection) value);
+    }
+
+    assertThat(reconstructed, equalTo(tuple));
+  }
 }