You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/12/22 23:34:52 UTC

[1/5] incubator-beam git commit: Update python-sdk pom.xml version and fixup to merge errors.

Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk d5c0175ca -> a46091a79


Update python-sdk pom.xml version and fixup to merge errors.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/cce0952c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/cce0952c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/cce0952c

Branch: refs/heads/python-sdk
Commit: cce0952c903c58a7030ba89363185c301663eb5a
Parents: 07a0728
Author: Ahmet Altay <al...@google.com>
Authored: Thu Dec 22 11:53:12 2016 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Thu Dec 22 15:34:41 2016 -0800

----------------------------------------------------------------------
 .../flink/translation/FlinkBatchTransformTranslators.java        | 4 ++--
 .../beam/runners/spark/translation/TransformTranslator.java      | 1 -
 sdks/python/pom.xml                                              | 2 +-
 3 files changed, 3 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cce0952c/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
index 1a161b0..eb625b2 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
@@ -536,7 +536,7 @@ class FlinkBatchTransformTranslators {
 
       FlinkDoFnFunction<InputT, OutputT> doFnWrapper =
           new FlinkDoFnFunction<>(
-              oldDoFn,
+              doFn,
               context.getOutput(transform).getWindowingStrategy(),
               sideInputStrategies,
               context.getPipelineOptions());
@@ -613,7 +613,7 @@ class FlinkBatchTransformTranslators {
       @SuppressWarnings("unchecked")
       FlinkMultiOutputDoFnFunction<InputT, OutputT> doFnWrapper =
           new FlinkMultiOutputDoFnFunction(
-              oldDoFn,
+              doFn,
               windowingStrategy,
               sideInputStrategies,
               context.getPipelineOptions(),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cce0952c/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index 8430bbf..5dd6beb 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -56,7 +56,6 @@ import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.CombineFnUtil;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cce0952c/sdks/python/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/python/pom.xml b/sdks/python/pom.xml
index 797966e..cc90969 100644
--- a/sdks/python/pom.xml
+++ b/sdks/python/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-parent</artifactId>
-    <version>0.4.0-incubating-SNAPSHOT</version>
+    <version>0.5.0-incubating-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 


[5/5] incubator-beam git commit: Closes #1690

Posted by dh...@apache.org.
Closes #1690


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a46091a7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a46091a7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a46091a7

Branch: refs/heads/python-sdk
Commit: a46091a7959eab6de59327f379ff46a245b3090a
Parents: d5c0175 cce0952
Author: Dan Halperin <dh...@google.com>
Authored: Thu Dec 22 15:34:42 2016 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Thu Dec 22 15:34:42 2016 -0800

----------------------------------------------------------------------
 .../beam/runners/apex/ApexYarnLauncher.java     |  6 +-
 .../translation/ParDoBoundTranslatorTest.java   | 32 ++++----
 .../beam/runners/direct/DirectGraphVisitor.java | 21 ------
 .../beam/runners/direct/DirectRunner.java       |  1 -
 .../runners/direct/DirectGraphVisitorTest.java  | 32 +-------
 .../direct/FlattenEvaluatorFactoryTest.java     |  2 +
 .../direct/KeyedPValueTrackingVisitorTest.java  | 17 ++++-
 .../FlinkBatchTransformTranslators.java         |  4 +-
 .../spark/translation/TransformTranslator.java  |  1 -
 .../beam/runners/spark/ForceStreamingTest.java  | 41 +---------
 .../main/java/org/apache/beam/sdk/Pipeline.java |  3 +
 .../beam/sdk/runners/TransformHierarchy.java    | 45 ++++++-----
 .../transforms/join/KeyedPCollectionTuple.java  | 32 ++++----
 .../java/org/apache/beam/sdk/values/PBegin.java |  5 --
 .../apache/beam/sdk/values/PCollectionList.java | 13 +---
 .../beam/sdk/values/PCollectionTuple.java       | 13 +---
 .../java/org/apache/beam/sdk/values/PInput.java |  9 ---
 .../org/apache/beam/sdk/values/POutput.java     | 20 ++---
 .../beam/sdk/values/POutputValueBase.java       |  4 +-
 .../java/org/apache/beam/sdk/values/PValue.java | 15 ++++
 .../org/apache/beam/sdk/values/PValueBase.java  |  3 +-
 .../org/apache/beam/sdk/values/TypedPValue.java | 78 +++++++++++---------
 .../sdk/runners/TransformHierarchyTest.java     | 34 +++++----
 .../apache/beam/sdk/transforms/ParDoTest.java   |  7 +-
 .../apache/beam/sdk/values/TypedPValueTest.java |  7 +-
 sdks/python/pom.xml                             |  2 +-
 26 files changed, 194 insertions(+), 253 deletions(-)
----------------------------------------------------------------------



[3/5] incubator-beam git commit: Use CountingSource in ForceStreamingTest

Posted by dh...@apache.org.
Use CountingSource in ForceStreamingTest

Removes the requirement to have a FakeUnboundedSource, plus the read is
fully specified.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6d972629
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6d972629
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6d972629

Branch: refs/heads/python-sdk
Commit: 6d9726290f61dd97f81de47c4070fb27e7e07432
Parents: d5c0175
Author: Thomas Groh <tg...@google.com>
Authored: Tue Dec 20 14:23:21 2016 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Thu Dec 22 15:34:41 2016 -0800

----------------------------------------------------------------------
 .../beam/runners/spark/ForceStreamingTest.java  | 39 +-------------------
 1 file changed, 2 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6d972629/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java
index eb17eea..1b2ff08 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java
@@ -23,10 +23,9 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import java.io.IOException;
 import java.util.List;
 import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.io.BoundedReadFromUnboundedSource;
+import org.apache.beam.sdk.io.CountingSource;
 import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.runners.TransformHierarchy;
@@ -59,7 +58,7 @@ public class ForceStreamingTest {
     // apply the BoundedReadFromUnboundedSource.
     @SuppressWarnings("unchecked")
     BoundedReadFromUnboundedSource boundedRead =
-        Read.from(new FakeUnboundedSource()).withMaxNumRecords(-1);
+        Read.from(CountingSource.unbounded()).withMaxNumRecords(-1);
     //noinspection unchecked
     pipeline.apply(boundedRead);
 
@@ -86,38 +85,4 @@ public class ForceStreamingTest {
     }
 
   }
-
-  /**
-   * A fake {@link UnboundedSource} to satisfy the compiler.
-   */
-  private static class FakeUnboundedSource extends UnboundedSource {
-
-    @Override
-    public List<? extends UnboundedSource> generateInitialSplits(
-        int desiredNumSplits,
-        PipelineOptions options) throws Exception {
-      return null;
-    }
-
-    @Override
-    public UnboundedReader createReader(
-        PipelineOptions options,
-        CheckpointMark checkpointMark) throws IOException {
-      return null;
-    }
-
-    @Override
-    public Coder getCheckpointMarkCoder() {
-      return null;
-    }
-
-    @Override
-    public void validate() { }
-
-    @Override
-    public Coder getDefaultOutputCoder() {
-      return null;
-    }
-  }
-
 }


[2/5] incubator-beam git commit: BEAM-1203 Fixed exception when creating zip entries during Apex YARN launch

Posted by dh...@apache.org.
BEAM-1203 Fixed exception when creating zip entries during Apex YARN launch


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/07a07280
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/07a07280
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/07a07280

Branch: refs/heads/python-sdk
Commit: 07a07280f0adc0b83476665733306c49d05f93ad
Parents: 99c49b0
Author: Pramod Immaneni <pr...@datatorrent.com>
Authored: Wed Dec 21 17:56:55 2016 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Thu Dec 22 15:34:41 2016 -0800

----------------------------------------------------------------------
 .../java/org/apache/beam/runners/apex/ApexYarnLauncher.java    | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/07a07280/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexYarnLauncher.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexYarnLauncher.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexYarnLauncher.java
index 0ae4cc7..a2d88f4 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexYarnLauncher.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexYarnLauncher.java
@@ -260,8 +260,10 @@ public class ApexYarnLauncher {
             if (!relativePath.endsWith("/")) {
               relativePath += "/";
             }
-            final Path dstDir = zipfs.getPath(relativePath);
-            Files.createDirectory(dstDir);
+            if (!relativePath.equals("META-INF/")) {
+              final Path dstDir = zipfs.getPath(relativePath);
+              Files.createDirectory(dstDir);
+            }
           }
           return super.preVisitDirectory(dir, attrs);
         }


[4/5] incubator-beam git commit: Add Parameters to finishSpecifying

Posted by dh...@apache.org.
Add Parameters to finishSpecifying

Remove the need to use getProducingTransformInternal in TypedPValue.

Ensure that all nodes are finished specifying before a call to
TransformHierarchy#visit. This ensures that all nodes are fully
specified without requiring the Pipeline or Runner to do so explicitly.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/99c49b04
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/99c49b04
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/99c49b04

Branch: refs/heads/python-sdk
Commit: 99c49b040aad52fc6558d70fee65d74f59b420de
Parents: 6d97262
Author: Thomas Groh <tg...@google.com>
Authored: Thu Dec 8 14:33:36 2016 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Thu Dec 22 15:34:41 2016 -0800

----------------------------------------------------------------------
 .../translation/ParDoBoundTranslatorTest.java   | 32 ++++----
 .../beam/runners/direct/DirectGraphVisitor.java | 21 ------
 .../beam/runners/direct/DirectRunner.java       |  1 -
 .../runners/direct/DirectGraphVisitorTest.java  | 32 +-------
 .../direct/FlattenEvaluatorFactoryTest.java     |  2 +
 .../direct/KeyedPValueTrackingVisitorTest.java  | 17 ++++-
 .../beam/runners/spark/ForceStreamingTest.java  |  2 -
 .../main/java/org/apache/beam/sdk/Pipeline.java |  3 +
 .../beam/sdk/runners/TransformHierarchy.java    | 45 ++++++-----
 .../transforms/join/KeyedPCollectionTuple.java  | 32 ++++----
 .../java/org/apache/beam/sdk/values/PBegin.java |  5 --
 .../apache/beam/sdk/values/PCollectionList.java | 13 +---
 .../beam/sdk/values/PCollectionTuple.java       | 13 +---
 .../java/org/apache/beam/sdk/values/PInput.java |  9 ---
 .../org/apache/beam/sdk/values/POutput.java     | 20 ++---
 .../beam/sdk/values/POutputValueBase.java       |  4 +-
 .../java/org/apache/beam/sdk/values/PValue.java | 15 ++++
 .../org/apache/beam/sdk/values/PValueBase.java  |  3 +-
 .../org/apache/beam/sdk/values/TypedPValue.java | 78 +++++++++++---------
 .../sdk/runners/TransformHierarchyTest.java     | 34 +++++----
 .../apache/beam/sdk/transforms/ParDoTest.java   |  7 +-
 .../apache/beam/sdk/values/TypedPValueTest.java |  7 +-
 22 files changed, 185 insertions(+), 210 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99c49b04/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslatorTest.java
index fa94b2a..f88a94d 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslatorTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslatorTest.java
@@ -45,6 +45,7 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.transforms.Create;
@@ -286,21 +287,22 @@ public class ParDoBoundTranslatorTest {
                 Arrays.asList(sideInput1, sideInput2),
                 Arrays.<TupleTag<String>>asList())));
 
-     outputs.get(mainOutputTag).apply(ParDo.of(new EmbeddedCollector()));
-     ApexRunnerResult result = (ApexRunnerResult) pipeline.run();
-
-     HashSet<String> expected = Sets.newHashSet("processing: 3: [11, 222]",
-         "processing: -42: [11, 222]", "processing: 666: [11, 222]");
-     long timeout = System.currentTimeMillis() + TIMEOUT_MILLIS;
-     while (System.currentTimeMillis() < timeout) {
-       if (EmbeddedCollector.RESULTS.containsAll(expected)) {
-         break;
-       }
-       LOG.info("Waiting for expected results.");
-       Thread.sleep(SLEEP_MILLIS);
-     }
-     result.cancel();
-     Assert.assertEquals(Sets.newHashSet(expected), EmbeddedCollector.RESULTS);
+    outputs.get(mainOutputTag).apply(ParDo.of(new EmbeddedCollector()));
+    outputs.get(sideOutputTag).setCoder(VoidCoder.of());
+    ApexRunnerResult result = (ApexRunnerResult) pipeline.run();
+
+    HashSet<String> expected = Sets.newHashSet("processing: 3: [11, 222]",
+        "processing: -42: [11, 222]", "processing: 666: [11, 222]");
+    long timeout = System.currentTimeMillis() + TIMEOUT_MILLIS;
+    while (System.currentTimeMillis() < timeout) {
+      if (EmbeddedCollector.RESULTS.containsAll(expected)) {
+        break;
+      }
+      LOG.info("Waiting for expected results.");
+      Thread.sleep(SLEEP_MILLIS);
+    }
+    result.cancel();
+    Assert.assertEquals(Sets.newHashSet(expected), EmbeddedCollector.RESULTS);
   }
 
   private static class TestMultiOutputWithSideInputsFn extends DoFn<Integer, String> {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99c49b04/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
index 425bbf1..7e6845d 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
@@ -27,7 +27,6 @@ import java.util.Map;
 import java.util.Set;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.Pipeline.PipelineVisitor;
-import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -51,7 +50,6 @@ class DirectGraphVisitor extends PipelineVisitor.Defaults {
   private Set<PCollectionView<?>> views = new HashSet<>();
   private Set<AppliedPTransform<?, ?, ?>> rootTransforms = new HashSet<>();
   private Map<AppliedPTransform<?, ?, ?>, String> stepNames = new HashMap<>();
-  private Set<PValue> toFinalize = new HashSet<>();
   private int numTransforms = 0;
   private boolean finalized = false;
 
@@ -80,9 +78,6 @@ class DirectGraphVisitor extends PipelineVisitor.Defaults {
 
   @Override
   public void visitPrimitiveTransform(TransformHierarchy.Node node) {
-    for (TaggedPValue consumed : node.getInputs()) {
-      toFinalize.remove(consumed.getValue());
-    }
     AppliedPTransform<?, ?, ?> appliedTransform = getAppliedTransform(node);
     stepNames.put(appliedTransform, genStepName());
     if (node.getInputs().isEmpty()) {
@@ -96,8 +91,6 @@ class DirectGraphVisitor extends PipelineVisitor.Defaults {
 
  @Override
   public void visitValue(PValue value, TransformHierarchy.Node producer) {
-    toFinalize.add(value);
-
     AppliedPTransform<?, ?, ?> appliedTransform = getAppliedTransform(producer);
     if (value instanceof PCollectionView) {
       views.add((PCollectionView<?>) value);
@@ -118,20 +111,6 @@ class DirectGraphVisitor extends PipelineVisitor.Defaults {
   }
 
   /**
-   * Returns all of the {@link PValue PValues} that have been produced but not consumed. These
-   * {@link PValue PValues} should be finalized by the {@link PipelineRunner} before the
-   * {@link Pipeline} is executed.
-   */
-  public void finishSpecifyingRemainder() {
-    checkState(
-        finalized,
-        "Can't call finishSpecifyingRemainder before the Pipeline has been completely traversed");
-    for (PValue unfinalized : toFinalize) {
-      unfinalized.finishSpecifying();
-    }
-  }
-
-  /**
    * Get the graph constructed by this {@link DirectGraphVisitor}, which provides
    * lookups for producers and consumers of {@link PValue PValues}.
    */

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99c49b04/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index 7e6ea15..5793b00 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -301,7 +301,6 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> {
     MetricsEnvironment.setMetricsSupported(true);
     DirectGraphVisitor graphVisitor = new DirectGraphVisitor();
     pipeline.traverseTopologically(graphVisitor);
-    graphVisitor.finishSpecifyingRemainder();
 
     @SuppressWarnings("rawtypes")
     KeyedPValueTrackingVisitor keyedPValueVisitor = KeyedPValueTrackingVisitor.create();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99c49b04/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
index c3bbe2d..01d11a3 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
@@ -20,7 +20,6 @@ package org.apache.beam.runners.direct;
 import static org.hamcrest.Matchers.emptyIterable;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasSize;
-import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
@@ -28,6 +27,7 @@ import com.google.common.collect.Iterables;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.CountingInput;
 import org.apache.beam.sdk.io.CountingSource;
 import org.apache.beam.sdk.io.Read;
@@ -111,6 +111,7 @@ public class DirectGraphVisitorTest implements Serializable {
     FlattenPCollectionList<String> flatten = Flatten.pCollections();
     PCollectionList<String> emptyList = PCollectionList.empty(p);
     PCollection<String> empty = emptyList.apply(flatten);
+    empty.setCoder(StringUtf8Coder.of());
     p.traverseTopologically(visitor);
     DirectGraph graph = visitor.getGraph();
     assertThat(
@@ -177,27 +178,6 @@ public class DirectGraphVisitorTest implements Serializable {
   }
 
   @Test
-  public void getUnfinalizedPValuesContainsDanglingOutputs() {
-    PCollection<String> created = p.apply(Create.of("1", "2", "3"));
-    PCollection<String> transformed =
-        created.apply(
-            ParDo.of(
-                new DoFn<String, String>() {
-                  @ProcessElement
-                  public void processElement(DoFn<String, String>.ProcessContext c)
-                      throws Exception {
-                    c.output(Integer.toString(c.element().length()));
-                  }
-                }));
-
-    assertThat(transformed.isFinishedSpecifyingInternal(), is(false));
-
-    p.traverseTopologically(visitor);
-    visitor.finishSpecifyingRemainder();
-    assertThat(transformed.isFinishedSpecifyingInternal(), is(true));
-  }
-
-  @Test
   public void getStepNamesContainsAllTransforms() {
     PCollection<String> created = p.apply(Create.of("1", "2", "3"));
     PCollection<String> transformed =
@@ -254,12 +234,4 @@ public class DirectGraphVisitorTest implements Serializable {
     thrown.expectMessage("get a graph");
     visitor.getGraph();
   }
-
-  @Test
-  public void finishSpecifyingRemainderWithoutVisitingThrows() {
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage("completely traversed");
-    thrown.expectMessage("finishSpecifyingRemainder");
-    visitor.finishSpecifyingRemainder();
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99c49b04/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java
index cda68f0..e07c9f9 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java
@@ -26,6 +26,7 @@ import static org.mockito.Mockito.when;
 import com.google.common.collect.Iterables;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
+import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Create;
@@ -124,6 +125,7 @@ public class FlattenEvaluatorFactoryTest {
     PCollectionList<Integer> list = PCollectionList.empty(p);
 
     PCollection<Integer> flattened = list.apply(Flatten.<Integer>pCollections());
+    flattened.setCoder(VarIntCoder.of());
 
     EvaluationContext evaluationContext = mock(EvaluationContext.class);
     when(evaluationContext.createBundle(flattened))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99c49b04/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java
index a1fb81b..8fac534 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertThat;
 
 import java.util.Collections;
 import org.apache.beam.runners.core.KeyedWorkItem;
+import org.apache.beam.runners.core.KeyedWorkItemCoder;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -34,6 +35,7 @@ import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.Keys;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -111,7 +113,13 @@ public class KeyedPValueTrackingVisitorTest {
                                 KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))))));
 
     PCollection<KeyedWorkItem<String, KV<String, Integer>>> unkeyed =
-        input.apply(ParDo.of(new ParDoMultiOverrideFactory.ToKeyedWorkItem<String, Integer>()));
+        input
+            .apply(ParDo.of(new ParDoMultiOverrideFactory.ToKeyedWorkItem<String, Integer>()))
+            .setCoder(
+                KeyedWorkItemCoder.of(
+                    StringUtf8Coder.of(),
+                    KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()),
+                    GlobalWindow.Coder.INSTANCE));
 
     p.traverseTopologically(visitor);
     assertThat(visitor.getKeyedPValues(), not(hasItem(unkeyed)));
@@ -139,7 +147,12 @@ public class KeyedPValueTrackingVisitorTest {
     PCollection<KeyedWorkItem<String, KV<String, Integer>>> keyed =
         input
             .apply(GroupByKey.<String, WindowedValue<KV<String, Integer>>>create())
-            .apply(ParDo.of(new ParDoMultiOverrideFactory.ToKeyedWorkItem<String, Integer>()));
+            .apply(ParDo.of(new ParDoMultiOverrideFactory.ToKeyedWorkItem<String, Integer>()))
+            .setCoder(
+                KeyedWorkItemCoder.of(
+                    StringUtf8Coder.of(),
+                    KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()),
+                    GlobalWindow.Coder.INSTANCE));
 
     p.traverseTopologically(visitor);
     assertThat(visitor.getKeyedPValues(), hasItem(keyed));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99c49b04/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java
index 1b2ff08..b7b59d1 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java
@@ -21,12 +21,10 @@ package org.apache.beam.runners.spark;
 import static org.hamcrest.MatcherAssert.assertThat;
 
 import java.io.IOException;
-import java.util.List;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.BoundedReadFromUnboundedSource;
 import org.apache.beam.sdk.io.CountingSource;
 import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.transforms.PTransform;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99c49b04/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
index 7a16f9d..eb0b199 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
@@ -171,6 +171,8 @@ public class Pipeline {
    * Runs the {@link Pipeline} using its {@link PipelineRunner}.
    */
   public PipelineResult run() {
+    // Ensure all of the nodes are fully specified before a PipelineRunner gets access to the
+    // pipeline.
     LOG.debug("Running {} via {}", this, runner);
     try {
       return runner.run(this);
@@ -281,6 +283,7 @@ public class Pipeline {
    * <p>Typically invoked by {@link PipelineRunner} subclasses.
    */
   public void traverseTopologically(PipelineVisitor visitor) {
+    // Ensure all nodes are fully specified before visiting the pipeline
     Set<PValue> visitedValues =
         // Visit all the transforms, which should implicitly visit all the values.
         transforms.visit(visitor);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99c49b04/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
index 29e7fcb..3676e1a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
@@ -28,6 +28,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.Pipeline.PipelineVisitor;
@@ -46,6 +47,8 @@ import org.apache.beam.sdk.values.TaggedPValue;
 public class TransformHierarchy {
   private final Node root;
   private final Map<POutput, Node> producers;
+  // A map of PValue to the PInput the producing PTransform is applied to
+  private final Map<PValue, PInput> producerInput;
   // Maintain a stack based on the enclosing nodes
   private Node current;
 
@@ -53,6 +56,7 @@ public class TransformHierarchy {
     root = new Node(null, null, "", null);
     current = root;
     producers = new HashMap<>();
+    producerInput = new HashMap<>();
   }
 
   /**
@@ -86,7 +90,13 @@ public class TransformHierarchy {
   public void finishSpecifyingInput() {
     // Inputs must be completely specified before they are consumed by a transform.
     for (TaggedPValue inputValue : current.getInputs()) {
-      inputValue.getValue().finishSpecifying();
+      Node producerNode = getProducer(inputValue.getValue());
+      PInput input = producerInput.remove(inputValue.getValue());
+      inputValue.getValue().finishSpecifying(input, producerNode.getTransform());
+      checkState(
+          producers.get(inputValue.getValue()) != null,
+          "Producer unknown for input %s",
+          inputValue);
       checkState(
           producers.get(inputValue.getValue()) != null,
           "Producer unknown for input %s",
@@ -105,12 +115,14 @@ public class TransformHierarchy {
    * nodes.
    */
   public void setOutput(POutput output) {
-    output.finishSpecifyingOutput();
     for (TaggedPValue value : output.expand()) {
       if (!producers.containsKey(value.getValue())) {
         producers.put(value.getValue(), current);
       }
+      value.getValue().finishSpecifyingOutput(current.input, current.transform);
+      producerInput.put(value.getValue(), current.input);
     }
+    output.finishSpecifyingOutput(current.input, current.transform);
     current.setOutput(output);
     // TODO: Replace with a "generateDefaultNames" method.
     output.recordAsOutput(current.toAppliedPTransform());
@@ -130,27 +142,26 @@ public class TransformHierarchy {
     return producers.get(produced);
   }
 
-  /**
-   * Returns all producing transforms for the {@link PValue PValues} contained
-   * in {@code output}.
-   */
-  List<Node> getProducingTransforms(POutput output) {
-    List<Node> producingTransforms = new ArrayList<>();
-    for (TaggedPValue value : output.expand()) {
-      Node producer = getProducer(value.getValue());
-      if (producer != null) {
-        producingTransforms.add(producer);
-      }
-    }
-    return producingTransforms;
-  }
-
   public Set<PValue> visit(PipelineVisitor visitor) {
+    finishSpecifying();
     Set<PValue> visitedValues = new HashSet<>();
     root.visit(visitor, visitedValues);
     return visitedValues;
   }
 
+  /**
+   * Finish specifying any remaining nodes within the {@link TransformHierarchy}. These are {@link
+   * PValue PValues} that are produced as output of some {@link PTransform} but are never consumed
+   * as input. These values must still be finished specifying.
+   */
+  private void finishSpecifying() {
+    for (Entry<PValue, PInput> producerInputEntry : producerInput.entrySet()) {
+      PValue value = producerInputEntry.getKey();
+      value.finishSpecifying(producerInputEntry.getValue(), getProducer(value).getTransform());
+    }
+    producerInput.clear();
+  }
+
   public Node getCurrent() {
     return current;
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99c49b04/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/KeyedPCollectionTuple.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/KeyedPCollectionTuple.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/KeyedPCollectionTuple.java
index 13d4ee1..b373909 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/KeyedPCollectionTuple.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/KeyedPCollectionTuple.java
@@ -152,13 +152,21 @@ public class KeyedPCollectionTuple<K> implements PInput {
     return pipeline;
   }
 
-  @Override
-  public void finishSpecifying() {
-    for (TaggedKeyedPCollection<K, ?> taggedPCollection : keyedCollections) {
-      taggedPCollection.pCollection.finishSpecifying();
+  private static <K, V> Coder<K> getKeyCoder(PCollection<KV<K, V>> pc) {
+    // TODO: This should already have run coder inference for output, but may not have been consumed
+    // as input yet (and won't be fully specified); This is fine
+
+    // Assumes that the PCollection uses a KvCoder.
+    Coder<?> entryCoder = pc.getCoder();
+    if (!(entryCoder instanceof KvCoder<?, ?>)) {
+      throw new IllegalArgumentException("PCollection does not use a KvCoder");
     }
+    @SuppressWarnings("unchecked")
+    KvCoder<K, V> coder = (KvCoder<K, V>) entryCoder;
+    return coder.getKeyCoder();
   }
 
+
   /////////////////////////////////////////////////////////////////////////////
 
   /**
@@ -197,7 +205,7 @@ public class KeyedPCollectionTuple<K> implements PInput {
    */
   private final List<TaggedKeyedPCollection<K, ?>> keyedCollections;
 
-  private final Coder<K> keyCoder;
+  private Coder<K> keyCoder;
 
   private final CoGbkResultSchema schema;
 
@@ -221,20 +229,6 @@ public class KeyedPCollectionTuple<K> implements PInput {
     this.keyCoder = keyCoder;
   }
 
-  private static <K, V> Coder<K> getKeyCoder(PCollection<KV<K, V>> pc) {
-    // Need to run coder inference on this PCollection before inspecting it.
-    pc.finishSpecifying();
-
-    // Assumes that the PCollection uses a KvCoder.
-    Coder<?> entryCoder = pc.getCoder();
-    if (!(entryCoder instanceof KvCoder<?, ?>)) {
-      throw new IllegalArgumentException("PCollection does not use a KvCoder");
-    }
-    @SuppressWarnings("unchecked")
-    KvCoder<K, V> coder = (KvCoder<K, V>) entryCoder;
-    return coder.getKeyCoder();
-  }
-
   private static <K> List<TaggedKeyedPCollection<K, ?>> copyAddLast(
         List<TaggedKeyedPCollection<K, ?>> keyedCollections,
         TaggedKeyedPCollection<K, ?> taggedCollection) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99c49b04/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PBegin.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PBegin.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PBegin.java
index 9aa4615..2ba0f1c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PBegin.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PBegin.java
@@ -69,11 +69,6 @@ public class PBegin implements PInput {
     return Collections.emptyList();
   }
 
-  @Override
-  public void finishSpecifying() {
-    // Nothing more to be done.
-  }
-
   /////////////////////////////////////////////////////////////////////////////
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99c49b04/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionList.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionList.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionList.java
index e4bb7c5..dcb64a8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionList.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionList.java
@@ -234,17 +234,8 @@ public class PCollectionList<T> implements PInput, POutput {
   }
 
   @Override
-  public void finishSpecifying() {
-    for (TaggedPValue pc : pcollections) {
-      pc.getValue().finishSpecifying();
-    }
-  }
-
-  @Override
-  public void finishSpecifyingOutput() {
-    for (TaggedPValue pc : pcollections) {
-      pc.getValue().finishSpecifyingOutput();
-    }
+  public void finishSpecifyingOutput(PInput input, PTransform<?, ?> transform) {
+    // All component PCollections will have already been finished.
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99c49b04/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionTuple.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionTuple.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionTuple.java
index 6afe59e..d61db51 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionTuple.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionTuple.java
@@ -255,17 +255,8 @@ public class PCollectionTuple implements PInput, POutput {
   }
 
   @Override
-  public void finishSpecifying() {
-    for (PCollection<?> pc : pcollectionMap.values()) {
-      pc.finishSpecifying();
-    }
-  }
-
-  @Override
-  public void finishSpecifyingOutput() {
-    for (PCollection<?> pc : pcollectionMap.values()) {
-      pc.finishSpecifyingOutput();
-    }
+  public void finishSpecifyingOutput(PInput input, PTransform<?, ?> transform) {
+    // All component PCollections will already have been finished
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99c49b04/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PInput.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PInput.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PInput.java
index a27b939..30d4297 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PInput.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PInput.java
@@ -44,13 +44,4 @@ public interface PInput {
    * <p>Not intended to be invoked directly by user code.
    */
   List<TaggedPValue> expand();
-
-  /**
-   * After building, finalizes this {@code PInput} to make it ready for
-   * being used as an input to a {@link org.apache.beam.sdk.transforms.PTransform}.
-   *
-   * <p>Automatically invoked whenever {@code apply()} is invoked on
-   * this {@code PInput}, so users do not normally call this explicitly.
-   */
-  void finishSpecifying();
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99c49b04/sdks/java/core/src/main/java/org/apache/beam/sdk/values/POutput.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/POutput.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/POutput.java
index e5d4504..062f565 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/POutput.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/POutput.java
@@ -61,16 +61,18 @@ public interface POutput {
   void recordAsOutput(AppliedPTransform<?, ?, ?> transform);
 
   /**
-   * As part of applying the producing {@link PTransform}, finalizes this
-   * output to make it ready for being used as an input and for running.
+   * As part of applying the producing {@link PTransform}, finalizes this output to make it ready
+   * for being used as an input and for running.
    *
-   * <p>This includes ensuring that all {@link PCollection PCollections}
-   * have {@link org.apache.beam.sdk.coders.Coder Coders} specified or defaulted.
+   * <p>This includes ensuring that all {@link PCollection PCollections} have {@link
+   * org.apache.beam.sdk.coders.Coder Coders} specified or defaulted.
    *
-   * <p>Automatically invoked whenever this {@link POutput} is used
-   * as a {@link PInput} to another {@link PTransform}, or if never
-   * used as a {@link PInput}, when {@link Pipeline#run}
-   * is called, so users do not normally call this explicitly.
+   * <p>Automatically invoked whenever this {@link POutput} is output, after {@link
+   * PValue#finishSpecifyingOutput(PInput, PTransform)} has been called on each component {@link
+   * PValue} returned by {@link #expand()}.
+   *
+   * @deprecated see BEAM-1199
    */
-  void finishSpecifyingOutput();
+  @Deprecated
+  void finishSpecifyingOutput(PInput input, PTransform<?, ?> transform);
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99c49b04/sdks/java/core/src/main/java/org/apache/beam/sdk/values/POutputValueBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/POutputValueBase.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/POutputValueBase.java
index 4772c47..cdef58c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/POutputValueBase.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/POutputValueBase.java
@@ -89,12 +89,12 @@ public abstract class POutputValueBase implements POutput {
   }
 
   /**
-   * Default behavior for {@link #finishSpecifyingOutput()} is
+   * Default behavior for {@link #finishSpecifyingOutput(PInput, PTransform)}} is
    * to do nothing. Override if your {@link PValue} requires
    * finalization.
    */
   @Override
-  public void finishSpecifyingOutput() { }
+  public void finishSpecifyingOutput(PInput input, PTransform<?, ?> transform) { }
 
   /**
    * The {@link PTransform} that produces this {@link POutputValueBase}.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99c49b04/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValue.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValue.java
index e6dbaf7..052a1f3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValue.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValue.java
@@ -36,6 +36,7 @@ public interface PValue extends POutput, PInput {
    *
    * <p>For internal use only.
    */
+  @Deprecated
   AppliedPTransform<?, ?, ?> getProducingTransformInternal();
 
   /**
@@ -46,4 +47,18 @@ public interface PValue extends POutput, PInput {
    */
   @Deprecated
   List<TaggedPValue> expand();
+
+  /**
+   * After building, finalizes this {@code PValue} to make it ready for being used as an input to a
+   * {@link org.apache.beam.sdk.transforms.PTransform}.
+   *
+   * <p>Automatically invoked whenever {@code apply()} is invoked on this {@code PValue}, after
+   * {@link PValue#finishSpecifying(PInput, PTransform)} has been called on each component {@link
+   * PValue}, so users do not normally call this explicitly.
+   *
+   * @param upstreamInput the {@link PInput} the {@link PTransform} was applied to to produce this
+   *     output
+   * @param upstreamTransform the {@link PTransform} that produced this {@link PValue}
+   */
+  void finishSpecifying(PInput upstreamInput, PTransform<?, ?> upstreamTransform);
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99c49b04/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java
index 3a10d5d..7b44737 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java
@@ -138,8 +138,7 @@ public abstract class PValueBase extends POutputValueBase implements PValue {
   }
 
   @Override
-  public void finishSpecifying() {
-    finishSpecifyingOutput();
+  public void finishSpecifying(PInput input, PTransform<?, ?> transform) {
     finishedSpecifying = true;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99c49b04/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypedPValue.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypedPValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypedPValue.java
index 7afd0a1..de1b99c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypedPValue.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypedPValue.java
@@ -17,12 +17,15 @@
  */
 package org.apache.beam.sdk.values;
 
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.CannotProvideCoderException.ReasonCode;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 
@@ -45,10 +48,8 @@ public abstract class TypedPValue<T> extends PValueBase implements PValue {
    * couldn't be inferred.
    */
   public Coder<T> getCoder() {
-    if (coder == null) {
-        coder = inferCoderOrFail();
-    }
-    return coder;
+    checkState(coderOrFailure.coder != null, coderOrFailure.failure);
+    return coderOrFailure.coder;
   }
 
   /**
@@ -60,18 +61,18 @@ public abstract class TypedPValue<T> extends PValueBase implements PValue {
    * {@code apply()} called on it
    */
   public TypedPValue<T> setCoder(Coder<T> coder) {
-    if (isFinishedSpecifyingInternal()) {
-      throw new IllegalStateException(
-          "cannot change the Coder of " + this + " once it's been used");
-    }
-    if (coder == null) {
-      throw new IllegalArgumentException(
-          "Cannot setCoder(null)");
-    }
-    this.coder = coder;
+    checkState(
+        !isFinishedSpecifyingInternal(), "cannot change the Coder of %s once it's been used", this);
+    checkArgument(coder != null, "Cannot setCoder(null)");
+    this.coderOrFailure = new CoderOrFailure<>(coder, null);
     return this;
   }
 
+  @Override
+  public void finishSpecifyingOutput(PInput input, PTransform<?, ?> transform) {
+    this.coderOrFailure = inferCoderOrFail(input, transform, getPipeline().getCoderRegistry());
+  }
+
   /**
    * After building, finalizes this {@link PValue} to make it ready for
    * running.  Automatically invoked whenever the {@link PValue} is "used"
@@ -79,24 +80,26 @@ public abstract class TypedPValue<T> extends PValueBase implements PValue {
    * run (useful if this is a {@link PValue} with no consumers).
    */
   @Override
-  public void finishSpecifying() {
+  public void finishSpecifying(PInput input, PTransform<?, ?> transform) {
     if (isFinishedSpecifyingInternal()) {
       return;
     }
-    super.finishSpecifying();
+    this.coderOrFailure = inferCoderOrFail(input, transform, getPipeline().getCoderRegistry());
     // Ensure that this TypedPValue has a coder by inferring the coder if none exists; If not,
     // this will throw an exception.
     getCoder();
+    super.finishSpecifying(input, transform);
   }
 
   /////////////////////////////////////////////////////////////////////////////
   // Internal details below here.
 
   /**
-   * The {@link Coder} used by this {@link TypedPValue} to encode and decode the
-   * values stored in it, or null if not specified nor inferred yet.
+   * The {@link Coder} used by this {@link TypedPValue} to encode and decode the values stored in
+   * it, or null if not specified nor inferred yet.
    */
-  private Coder<T> coder;
+  private CoderOrFailure<T> coderOrFailure =
+      new CoderOrFailure<>(null, "No Coder was specified, and Coder Inference did not occur");
 
   protected TypedPValue(Pipeline p) {
     super(p);
@@ -125,34 +128,31 @@ public abstract class TypedPValue<T> extends PValueBase implements PValue {
   }
 
   /**
-   * If the coder is not explicitly set, this sets the coder for
-   * this {@link TypedPValue} to the best coder that can be inferred
-   * based upon the known {@link TypeDescriptor}. By default, this is null,
-   * but can and should be improved by subclasses.
+   * If the coder is not explicitly set, this sets the coder for this {@link TypedPValue} to the
+   * best coder that can be inferred based upon the known {@link TypeDescriptor}. By default, this
+   * is null, but can and should be improved by subclasses.
    */
   @SuppressWarnings({"unchecked", "rawtypes"})
-  private Coder<T> inferCoderOrFail() {
+  private CoderOrFailure<T> inferCoderOrFail(
+      PInput input, PTransform<?, ?> transform, CoderRegistry registry) {
     // First option for a coder: use the Coder set on this PValue.
-    if (coder != null) {
-      return coder;
+    if (coderOrFailure.coder != null) {
+      return coderOrFailure;
     }
 
-    AppliedPTransform<?, ?, ?> application = getProducingTransformInternal();
-
     // Second option for a coder: Look in the coder registry.
-    CoderRegistry registry = getPipeline().getCoderRegistry();
     TypeDescriptor<T> token = getTypeDescriptor();
     CannotProvideCoderException inferFromTokenException = null;
     if (token != null) {
       try {
-          return registry.getDefaultCoder(token);
+        return new CoderOrFailure<>(registry.getDefaultCoder(token), null);
       } catch (CannotProvideCoderException exc) {
         inferFromTokenException = exc;
         // Attempt to detect when the token came from a TupleTag used for a ParDo side output,
         // and provide a better error message if so. Unfortunately, this information is not
         // directly available from the TypeDescriptor, so infer based on the type of the PTransform
         // and the error message itself.
-        if (application.getTransform() instanceof ParDo.BoundMulti
+        if (transform instanceof ParDo.BoundMulti
             && exc.getReason() == ReasonCode.TYPE_ERASURE) {
           inferFromTokenException = new CannotProvideCoderException(exc.getMessage()
               + " If this error occurs for a side output of the producing ParDo, verify that the "
@@ -165,8 +165,8 @@ public abstract class TypedPValue<T> extends PValueBase implements PValue {
     // Third option for a coder: use the default Coder from the producing PTransform.
     CannotProvideCoderException inputCoderException;
     try {
-      return ((PTransform) application.getTransform()).getDefaultOutputCoder(
-          application.getInput(), this);
+      return new CoderOrFailure<>(
+          ((PTransform) transform).getDefaultOutputCoder(input, this), null);
     } catch (CannotProvideCoderException exc) {
       inputCoderException = exc;
     }
@@ -193,6 +193,16 @@ public abstract class TypedPValue<T> extends PValueBase implements PValue {
     }
 
     // Build and throw the exception.
-    throw new IllegalStateException(messageBuilder.toString());
+    return new CoderOrFailure<>(null, messageBuilder.toString());
+  }
+
+  private static class CoderOrFailure<T> {
+    @Nullable private final Coder<T> coder;
+    @Nullable private final String failure;
+
+    public CoderOrFailure(@Nullable Coder<T> coder, @Nullable String failure) {
+      this.coder = coder;
+      this.failure = failure;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99c49b04/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
index d790d39..d373caf 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertThat;
 
 import com.google.common.base.Function;
 import com.google.common.collect.Lists;
+import java.io.Serializable;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -32,9 +33,9 @@ import org.apache.beam.sdk.io.CountingSource;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
@@ -54,13 +55,11 @@ import org.junit.runners.JUnit4;
  * Tests for {@link TransformHierarchy}.
  */
 @RunWith(JUnit4.class)
-public class TransformHierarchyTest {
-
-  @Rule public final TestPipeline pipeline = TestPipeline.create();
-  @Rule public ExpectedException thrown = ExpectedException.none();
-
-  private TransformHierarchy hierarchy;
+public class TransformHierarchyTest implements Serializable {
+  @Rule public final transient TestPipeline pipeline = TestPipeline.create();
+  @Rule public transient ExpectedException thrown = ExpectedException.none();
 
+  private transient TransformHierarchy hierarchy;
 
   @Before
   public void setup() {
@@ -162,18 +161,21 @@ public class TransformHierarchyTest {
         PCollection.createPrimitiveOutputInternal(
             pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
 
-    MapElements<Long, Long> map = MapElements.via(new SimpleFunction<Long, Long>() {
-      @Override
-      public Long apply(Long input) {
-        return input;
-      }
-    });
+    ParDo.Bound<Long, Long> pardo =
+        ParDo.of(
+            new DoFn<Long, Long>() {
+              @ProcessElement
+              public void processElement(ProcessContext ctxt) {
+                ctxt.output(ctxt.element());
+              }
+            });
 
     PCollection<Long> mapped =
         PCollection.createPrimitiveOutputInternal(
             pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
 
     TransformHierarchy.Node compositeNode = hierarchy.pushNode("Create", begin, create);
+    hierarchy.finishSpecifyingInput();
     assertThat(hierarchy.getCurrent(), equalTo(compositeNode));
     assertThat(compositeNode.getInputs(), Matchers.emptyIterable());
     assertThat(compositeNode.getTransform(), Matchers.<PTransform<?, ?>>equalTo(create));
@@ -183,6 +185,7 @@ public class TransformHierarchyTest {
 
     TransformHierarchy.Node primitiveNode = hierarchy.pushNode("Create/Read", begin, read);
     assertThat(hierarchy.getCurrent(), equalTo(primitiveNode));
+    hierarchy.finishSpecifyingInput();
     hierarchy.setOutput(created);
     hierarchy.popNode();
     assertThat(
@@ -199,7 +202,8 @@ public class TransformHierarchyTest {
     assertThat(hierarchy.getProducer(created), equalTo(primitiveNode));
     hierarchy.popNode();
 
-    TransformHierarchy.Node otherPrimitive = hierarchy.pushNode("ParDo", created, map);
+    TransformHierarchy.Node otherPrimitive = hierarchy.pushNode("ParDo", created, pardo);
+    hierarchy.finishSpecifyingInput();
     hierarchy.setOutput(mapped);
     hierarchy.popNode();
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99c49b04/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index fa8874c..d95b2d0 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -1147,15 +1147,16 @@ public class ParDoTest implements Serializable {
 
     final TupleTag<Integer> mainOutputTag = new TupleTag<Integer>("main");
     final TupleTag<TestDummy> sideOutputTag = new TupleTag<TestDummy>("unregisteredSide");
-    PCollectionTuple outputTuple = input.apply(ParDo.of(new SideOutputDummyFn(sideOutputTag))
-        .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag)));
+    ParDo.BoundMulti<Integer, Integer> pardo = ParDo.of(new SideOutputDummyFn(sideOutputTag))
+        .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag));
+    PCollectionTuple outputTuple = input.apply(pardo);
 
     outputTuple.get(sideOutputTag).setCoder(new TestDummyCoder());
 
     outputTuple.get(sideOutputTag).apply(View.<TestDummy>asSingleton());
 
     assertEquals(new TestDummyCoder(), outputTuple.get(sideOutputTag).getCoder());
-    outputTuple.get(sideOutputTag).finishSpecifyingOutput(); // Check for crashes
+    outputTuple.get(sideOutputTag).finishSpecifyingOutput(input, pardo); // Check for crashes
     assertEquals(new TestDummyCoder(),
         outputTuple.get(sideOutputTag).getCoder()); // Check for corruption
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99c49b04/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java
index 8381f12..5e7cc7d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java
@@ -159,13 +159,16 @@ public class TypedPValueTest {
 
   @Test
   public void testFinishSpecifyingShouldFailIfNoCoderInferrable() {
+    p.enableAbandonedNodeEnforcement(false);
+    PCollection<Integer> created = p.apply(Create.of(1, 2, 3));
+    ParDo.Bound<Integer, EmptyClass> uninferrableParDo = ParDo.of(new EmptyClassDoFn());
     PCollection<EmptyClass> unencodable =
-        p.apply(Create.of(1, 2, 3)).apply(ParDo.of(new EmptyClassDoFn()));
+        created.apply(uninferrableParDo);
 
     thrown.expect(IllegalStateException.class);
     thrown.expectMessage("Unable to return a default Coder");
     thrown.expectMessage("Inferring a Coder from the CoderRegistry failed");
 
-    unencodable.finishSpecifying();
+    unencodable.finishSpecifying(created, uninferrableParDo);
   }
 }