You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/07/25 02:21:45 UTC

[3/5] beam git commit: Add Pipeline rehydration from proto

Add Pipeline rehydration from proto


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/43481595
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/43481595
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/43481595

Branch: refs/heads/master
Commit: 43481595ebc854f4a7188609fd53267497e68124
Parents: 12c277f
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri May 26 11:22:50 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Jul 24 18:53:26 2017 -0700

----------------------------------------------------------------------
 .../construction/PTransformTranslation.java     |   8 +
 .../core/construction/PipelineTranslation.java  | 280 +++++++++++++++++++
 .../core/construction/RehydratedComponents.java |   3 +-
 .../core/construction/SdkComponents.java        |  52 ----
 .../construction/PipelineTranslationTest.java   | 199 +++++++++++++
 .../core/construction/SdkComponentsTest.java    | 107 -------
 .../src/main/proto/beam_runner_api.proto        |   4 +-
 .../main/java/org/apache/beam/sdk/Pipeline.java |  15 +-
 .../beam/sdk/runners/TransformHierarchy.java    |  69 +++++
 9 files changed, 574 insertions(+), 163 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/43481595/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
index d459645..b8365c9 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
@@ -92,6 +92,7 @@ public class PTransformTranslation {
       List<AppliedPTransform<?, ?, ?>> subtransforms,
       SdkComponents components)
       throws IOException {
+    // TODO include DisplayData https://issues.apache.org/jira/browse/BEAM-2645
     RunnerApi.PTransform.Builder transformBuilder = RunnerApi.PTransform.newBuilder();
     for (Map.Entry<TupleTag<?>, PValue> taggedInput : appliedPTransform.getInputs().entrySet()) {
       checkArgument(
@@ -136,6 +137,7 @@ public class PTransformTranslation {
         }
         transformBuilder.setSpec(payload);
       }
+      rawPTransform.registerComponents(components);
     } else if (KNOWN_PAYLOAD_TRANSLATORS.containsKey(transform.getClass())) {
       FunctionSpec payload =
           KNOWN_PAYLOAD_TRANSLATORS
@@ -225,6 +227,8 @@ public class PTransformTranslation {
     public Any getPayload() {
       return null;
     }
+
+    public void registerComponents(SdkComponents components) {}
   }
 
   /**
@@ -255,6 +259,10 @@ public class PTransformTranslation {
         transformSpec.setParameter(payload);
       }
 
+      // Transforms like Combine may have Coders that need to be added but do not
+      // occur in a black-box traversal
+      transform.getTransform().registerComponents(components);
+
       return transformSpec.build();
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/43481595/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
new file mode 100644
index 0000000..9e4839a
--- /dev/null
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
+import com.google.protobuf.Any;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.core.construction.PTransformTranslation.RawPTransform;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.Pipeline.PipelineVisitor;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.runners.TransformHierarchy;
+import org.apache.beam.sdk.runners.TransformHierarchy.Node;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PCollectionViews;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+
+/** Utilities for going to/from Runner API pipelines. */
+public class PipelineTranslation {
+
+  public static RunnerApi.Pipeline toProto(final Pipeline pipeline) {
+    final SdkComponents components = SdkComponents.create();
+    final Collection<String> rootIds = new HashSet<>();
+    pipeline.traverseTopologically(
+        new PipelineVisitor.Defaults() {
+          private final ListMultimap<Node, AppliedPTransform<?, ?, ?>> children =
+              ArrayListMultimap.create();
+
+          @Override
+          public void leaveCompositeTransform(Node node) {
+            if (node.isRootNode()) {
+              for (AppliedPTransform<?, ?, ?> pipelineRoot : children.get(node)) {
+                rootIds.add(components.getExistingPTransformId(pipelineRoot));
+              }
+            } else {
+              // TODO: Include DisplayData in the proto
+              children.put(node.getEnclosingNode(), node.toAppliedPTransform(pipeline));
+              try {
+                components.registerPTransform(
+                    node.toAppliedPTransform(pipeline), children.get(node));
+              } catch (IOException e) {
+                throw new RuntimeException(e);
+              }
+            }
+          }
+
+          @Override
+          public void visitPrimitiveTransform(Node node) {
+            // TODO: Include DisplayData in the proto
+            children.put(node.getEnclosingNode(), node.toAppliedPTransform(pipeline));
+            try {
+              components.registerPTransform(
+                  node.toAppliedPTransform(pipeline),
+                  Collections.<AppliedPTransform<?, ?, ?>>emptyList());
+            } catch (IOException e) {
+              throw new IllegalStateException(e);
+            }
+          }
+        });
+    return RunnerApi.Pipeline.newBuilder()
+        .setComponents(components.toComponents())
+        .addAllRootTransformIds(rootIds)
+        .build();
+  }
+
+  private static DisplayData evaluateDisplayData(HasDisplayData component) {
+    return DisplayData.from(component);
+  }
+
+  public static Pipeline fromProto(final RunnerApi.Pipeline pipelineProto)
+      throws IOException {
+    TransformHierarchy transforms = new TransformHierarchy();
+    Pipeline pipeline = Pipeline.forTransformHierarchy(transforms, PipelineOptionsFactory.create());
+
+    // Keeping the PCollections straight is a semantic necessity, but being careful not to explode
+    // the number of coders and windowing strategies is also nice, and helps testing.
+    RehydratedComponents rehydratedComponents =
+        RehydratedComponents.forComponents(pipelineProto.getComponents()).withPipeline(pipeline);
+
+    for (String rootId : pipelineProto.getRootTransformIdsList()) {
+      addRehydratedTransform(
+          transforms,
+          pipelineProto.getComponents().getTransformsOrThrow(rootId),
+          pipeline,
+          pipelineProto.getComponents().getTransformsMap(),
+          rehydratedComponents);
+    }
+
+    return pipeline;
+  }
+
+  private static void addRehydratedTransform(
+      TransformHierarchy transforms,
+      RunnerApi.PTransform transformProto,
+      Pipeline pipeline,
+      Map<String, RunnerApi.PTransform> transformProtos,
+      RehydratedComponents rehydratedComponents)
+      throws IOException {
+
+    Map<TupleTag<?>, PValue> rehydratedInputs = new HashMap<>();
+    for (Map.Entry<String, String> inputEntry : transformProto.getInputsMap().entrySet()) {
+      rehydratedInputs.put(
+          new TupleTag<>(inputEntry.getKey()),
+          rehydratedComponents.getPCollection(inputEntry.getValue()));
+    }
+
+    Map<TupleTag<?>, PValue> rehydratedOutputs = new HashMap<>();
+    for (Map.Entry<String, String> outputEntry : transformProto.getOutputsMap().entrySet()) {
+      rehydratedOutputs.put(
+          new TupleTag<>(outputEntry.getKey()),
+          rehydratedComponents.getPCollection(outputEntry.getValue()));
+    }
+
+    RunnerApi.FunctionSpec transformSpec = transformProto.getSpec();
+
+    // By default, no "additional" inputs, since that is an SDK-specific thing.
+    // Only ParDo really separates main from side inputs
+    Map<TupleTag<?>, PValue> additionalInputs = Collections.emptyMap();
+
+    // TODO: ParDoTranslator should own it - https://issues.apache.org/jira/browse/BEAM-2674
+    if (transformSpec.getUrn().equals(PTransformTranslation.PAR_DO_TRANSFORM_URN)) {
+      RunnerApi.ParDoPayload payload =
+          transformSpec.getParameter().unpack(RunnerApi.ParDoPayload.class);
+
+      List<PCollectionView<?>> views = new ArrayList<>();
+      for (Map.Entry<String, RunnerApi.SideInput> sideInputEntry :
+          payload.getSideInputsMap().entrySet()) {
+        String localName = sideInputEntry.getKey();
+        RunnerApi.SideInput sideInput = sideInputEntry.getValue();
+        PCollection<?> pCollection =
+            (PCollection<?>) checkNotNull(rehydratedInputs.get(new TupleTag<>(localName)));
+        views.add(
+            ParDoTranslation.viewFromProto(
+                sideInputEntry.getValue(),
+                sideInputEntry.getKey(),
+                pCollection,
+                transformProto,
+                rehydratedComponents));
+      }
+      additionalInputs = PCollectionViews.toAdditionalInputs(views);
+    }
+
+    // TODO: CombineTranslator should own it - https://issues.apache.org/jira/browse/BEAM-2674
+    List<Coder<?>> additionalCoders = Collections.emptyList();
+    if (transformSpec.getUrn().equals(PTransformTranslation.COMBINE_TRANSFORM_URN)) {
+      RunnerApi.CombinePayload payload =
+          transformSpec.getParameter().unpack(RunnerApi.CombinePayload.class);
+      additionalCoders =
+          (List)
+              Collections.singletonList(
+                  rehydratedComponents.getCoder(payload.getAccumulatorCoderId()));
+    }
+
+    RehydratedPTransform transform =
+        RehydratedPTransform.of(
+            transformSpec.getUrn(),
+            transformSpec.getParameter(),
+            additionalInputs,
+            additionalCoders);
+
+    if (isPrimitive(transformProto)) {
+      transforms.addFinalizedPrimitiveNode(
+          transformProto.getUniqueName(), rehydratedInputs, transform, rehydratedOutputs);
+    } else {
+      transforms.pushFinalizedNode(
+          transformProto.getUniqueName(), rehydratedInputs, transform, rehydratedOutputs);
+
+      for (String childTransformId : transformProto.getSubtransformsList()) {
+        addRehydratedTransform(
+            transforms,
+            transformProtos.get(childTransformId),
+            pipeline,
+            transformProtos,
+            rehydratedComponents);
+      }
+
+      transforms.popNode();
+    }
+  }
+
+  // A primitive transform is one with outputs that are not in its input and also
+  // not produced by a subtransform.
+  private static boolean isPrimitive(RunnerApi.PTransform transformProto) {
+    return transformProto.getSubtransformsCount() == 0
+        && !transformProto
+        .getInputsMap()
+        .values()
+        .containsAll(transformProto.getOutputsMap().values());
+  }
+
+  @AutoValue
+  abstract static class RehydratedPTransform extends RawPTransform<PInput, POutput> {
+
+    @Nullable
+    public abstract String getUrn();
+
+    @Nullable
+    public abstract Any getPayload();
+
+    @Override
+    public abstract Map<TupleTag<?>, PValue> getAdditionalInputs();
+
+    public abstract List<Coder<?>> getCoders();
+
+    public static RehydratedPTransform of(
+        String urn,
+        Any payload,
+        Map<TupleTag<?>, PValue> additionalInputs,
+        List<Coder<?>> additionalCoders) {
+      return new AutoValue_PipelineTranslation_RehydratedPTransform(
+          urn, payload, additionalInputs, additionalCoders);
+    }
+
+    @Override
+    public POutput expand(PInput input) {
+      throw new IllegalStateException(
+          String.format(
+              "%s should never be asked to expand;"
+                  + " it is the result of deserializing an already-constructed Pipeline",
+              getClass().getSimpleName()));
+    }
+
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(this)
+          .add("urn", getUrn())
+          .add("payload", getPayload())
+          .toString();
+    }
+
+    @Override
+    public void registerComponents(SdkComponents components) {
+      for (Coder<?> coder : getCoders()) {
+        try {
+          components.registerCoder(coder);
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/43481595/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RehydratedComponents.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RehydratedComponents.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RehydratedComponents.java
index a9a34d7..ccdd4a7 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RehydratedComponents.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RehydratedComponents.java
@@ -93,7 +93,8 @@ public class RehydratedComponents {
                       PCollection.class.getSimpleName(),
                       Pipeline.class.getSimpleName());
                   return PCollectionTranslation.fromProto(
-                      components.getPcollectionsOrThrow(id), pipeline, RehydratedComponents.this);
+                      components.getPcollectionsOrThrow(id), pipeline, RehydratedComponents.this)
+                      .setName(id);
                 }
               });
 

http://git-wip-us.apache.org/repos/asf/beam/blob/43481595/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
index 0d3ba60..54d2e9d 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
@@ -22,24 +22,16 @@ import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.common.base.Equivalence;
-import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.BiMap;
 import com.google.common.collect.HashBiMap;
-import com.google.common.collect.ListMultimap;
 import java.io.IOException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.Pipeline.PipelineVisitor;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components;
 import org.apache.beam.sdk.runners.AppliedPTransform;
-import org.apache.beam.sdk.runners.TransformHierarchy.Node;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.util.NameUtils;
 import org.apache.beam.sdk.values.PCollection;
@@ -62,50 +54,6 @@ public class SdkComponents {
     return new SdkComponents();
   }
 
-  public static RunnerApi.Pipeline translatePipeline(Pipeline pipeline) {
-    final SdkComponents components = create();
-    final Collection<String> rootIds = new HashSet<>();
-    pipeline.traverseTopologically(
-        new PipelineVisitor.Defaults() {
-          private final ListMultimap<Node, AppliedPTransform<?, ?, ?>> children =
-              ArrayListMultimap.create();
-
-          @Override
-          public void leaveCompositeTransform(Node node) {
-            if (node.isRootNode()) {
-              for (AppliedPTransform<?, ?, ?> pipelineRoot : children.get(node)) {
-                rootIds.add(components.getExistingPTransformId(pipelineRoot));
-              }
-            } else {
-              children.put(node.getEnclosingNode(), node.toAppliedPTransform(getPipeline()));
-              try {
-                components.registerPTransform(
-                    node.toAppliedPTransform(getPipeline()), children.get(node));
-              } catch (IOException e) {
-                throw new RuntimeException(e);
-              }
-            }
-          }
-
-          @Override
-          public void visitPrimitiveTransform(Node node) {
-            children.put(node.getEnclosingNode(), node.toAppliedPTransform(getPipeline()));
-            try {
-              components.registerPTransform(
-                  node.toAppliedPTransform(getPipeline()),
-                  Collections.<AppliedPTransform<?, ?, ?>>emptyList());
-            } catch (IOException e) {
-              throw new IllegalStateException(e);
-            }
-          }
-        });
-    // TODO: Display Data
-    return RunnerApi.Pipeline.newBuilder()
-        .setComponents(components.toComponents())
-        .addAllRootTransformIds(rootIds)
-        .build();
-  }
-
   private SdkComponents() {
     this.componentsBuilder = RunnerApi.Components.newBuilder();
     this.transformIds = HashBiMap.create();

http://git-wip-us.apache.org/repos/asf/beam/blob/43481595/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PipelineTranslationTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PipelineTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PipelineTranslationTest.java
new file mode 100644
index 0000000..9e6dff4
--- /dev/null
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PipelineTranslationTest.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.base.Equivalence;
+import com.google.common.collect.ImmutableList;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.Pipeline.PipelineVisitor;
+import org.apache.beam.sdk.coders.BigEndianLongCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StructuredCoder;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
+import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.runners.TransformHierarchy.Node;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.joda.time.Duration;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+/** Tests for {@link PipelineTranslation}. */
+@RunWith(Parameterized.class)
+public class PipelineTranslationTest {
+  @Parameter(0)
+  public Pipeline pipeline;
+
+  @Parameters(name = "{index}")
+  public static Iterable<Pipeline> testPipelines() {
+    Pipeline trivialPipeline = Pipeline.create();
+    trivialPipeline.apply(Create.of(1, 2, 3));
+
+    Pipeline sideInputPipeline = Pipeline.create();
+    final PCollectionView<String> singletonView =
+        sideInputPipeline.apply(Create.of("foo")).apply(View.<String>asSingleton());
+    sideInputPipeline
+        .apply(Create.of("main input"))
+        .apply(
+            ParDo.of(
+                    new DoFn<String, String>() {
+                      @ProcessElement
+                      public void process(ProcessContext c) {
+                        // actually never executed and no effect on translation
+                        c.sideInput(singletonView);
+                      }
+                    })
+                .withSideInputs(singletonView));
+
+    Pipeline complexPipeline = Pipeline.create();
+    BigEndianLongCoder customCoder = BigEndianLongCoder.of();
+    PCollection<Long> elems = complexPipeline.apply(GenerateSequence.from(0L).to(207L));
+    PCollection<Long> counted = elems.apply(Count.<Long>globally()).setCoder(customCoder);
+    PCollection<Long> windowed =
+        counted.apply(
+            Window.<Long>into(FixedWindows.of(Duration.standardMinutes(7)))
+                .triggering(
+                    AfterWatermark.pastEndOfWindow()
+                        .withEarlyFirings(AfterPane.elementCountAtLeast(19)))
+                .accumulatingFiredPanes()
+                .withAllowedLateness(Duration.standardMinutes(3L)));
+    final WindowingStrategy<?, ?> windowedStrategy = windowed.getWindowingStrategy();
+    PCollection<KV<String, Long>> keyed = windowed.apply(WithKeys.<String, Long>of("foo"));
+    PCollection<KV<String, Iterable<Long>>> grouped =
+        keyed.apply(GroupByKey.<String, Long>create());
+
+    return ImmutableList.of(trivialPipeline, sideInputPipeline, complexPipeline);
+  }
+
+  @Test
+  public void testProtoDirectly() {
+    final RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline);
+    pipeline.traverseTopologically(
+        new PipelineProtoVerificationVisitor(pipelineProto));
+  }
+
+  @Test
+  public void testProtoAgainstRehydrated() throws Exception {
+    RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline);
+    Pipeline rehydrated = PipelineTranslation.fromProto(pipelineProto);
+
+    rehydrated.traverseTopologically(
+        new PipelineProtoVerificationVisitor(pipelineProto));
+  }
+
+  private static class PipelineProtoVerificationVisitor extends PipelineVisitor.Defaults {
+
+    private final RunnerApi.Pipeline pipelineProto;
+    Set<Node> transforms;
+    Set<PCollection<?>> pcollections;
+    Set<Equivalence.Wrapper<? extends Coder<?>>> coders;
+    Set<WindowingStrategy<?, ?>> windowingStrategies;
+
+    public PipelineProtoVerificationVisitor(RunnerApi.Pipeline pipelineProto) {
+      this.pipelineProto = pipelineProto;
+      transforms = new HashSet<>();
+      pcollections = new HashSet<>();
+      coders = new HashSet<>();
+      windowingStrategies = new HashSet<>();
+    }
+
+    @Override
+    public void leaveCompositeTransform(Node node) {
+      if (node.isRootNode()) {
+        assertThat(
+            "Unexpected number of PTransforms",
+            pipelineProto.getComponents().getTransformsCount(),
+            equalTo(transforms.size()));
+        assertThat(
+            "Unexpected number of PCollections",
+            pipelineProto.getComponents().getPcollectionsCount(),
+            equalTo(pcollections.size()));
+        assertThat(
+            "Unexpected number of Coders",
+            pipelineProto.getComponents().getCodersCount(),
+            equalTo(coders.size()));
+        assertThat(
+            "Unexpected number of Windowing Strategies",
+            pipelineProto.getComponents().getWindowingStrategiesCount(),
+            equalTo(windowingStrategies.size()));
+      } else {
+        transforms.add(node);
+        if (PTransformTranslation.COMBINE_TRANSFORM_URN.equals(
+            PTransformTranslation.urnForTransformOrNull(node.getTransform()))) {
+          // Combine translation introduces a coder that is not assigned to any PCollection
+          // in the default expansion, and must be explicitly added here.
+          try {
+            addCoders(
+                CombineTranslation.getAccumulatorCoder(node.toAppliedPTransform(getPipeline())));
+          } catch (IOException e) {
+            throw new RuntimeException(e);
+          }
+        }
+      }
+    }
+
+    @Override
+    public void visitPrimitiveTransform(Node node) {
+      transforms.add(node);
+    }
+
+    @Override
+    public void visitValue(PValue value, Node producer) {
+      if (value instanceof PCollection) {
+        PCollection pc = (PCollection) value;
+        pcollections.add(pc);
+        addCoders(pc.getCoder());
+        windowingStrategies.add(pc.getWindowingStrategy());
+        addCoders(pc.getWindowingStrategy().getWindowFn().windowCoder());
+      }
+    }
+
+    private void addCoders(Coder<?> coder) {
+      coders.add(Equivalence.<Coder<?>>identity().wrap(coder));
+      if (CoderTranslation.KNOWN_CODER_URNS.containsKey(coder.getClass())) {
+        for (Coder<?> component : ((StructuredCoder<?>) coder).getComponents()) {
+          addCoders(component);
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/43481595/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java
index ce6a99f..82840d6 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java
@@ -24,43 +24,25 @@ import static org.hamcrest.Matchers.isEmptyOrNullString;
 import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertThat;
 
-import com.google.common.base.Equivalence;
 import java.io.IOException;
 import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-import org.apache.beam.sdk.Pipeline.PipelineVisitor;
-import org.apache.beam.sdk.coders.BigEndianLongCoder;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.SetCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.StructuredCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components;
 import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.runners.AppliedPTransform;
-import org.apache.beam.sdk.runners.TransformHierarchy.Node;
 import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.WithKeys;
-import org.apache.beam.sdk.transforms.windowing.AfterPane;
-import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.beam.sdk.values.WindowingStrategy.AccumulationMode;
 import org.hamcrest.Matchers;
-import org.joda.time.Duration;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -78,95 +60,6 @@ public class SdkComponentsTest {
   private SdkComponents components = SdkComponents.create();
 
   @Test
-  public void translatePipeline() {
-    BigEndianLongCoder customCoder = BigEndianLongCoder.of();
-    PCollection<Long> elems = pipeline.apply(GenerateSequence.from(0L).to(207L));
-    PCollection<Long> counted = elems.apply(Count.<Long>globally()).setCoder(customCoder);
-    PCollection<Long> windowed =
-        counted.apply(
-            Window.<Long>into(FixedWindows.of(Duration.standardMinutes(7)))
-                .triggering(
-                    AfterWatermark.pastEndOfWindow()
-                        .withEarlyFirings(AfterPane.elementCountAtLeast(19)))
-                .accumulatingFiredPanes()
-                .withAllowedLateness(Duration.standardMinutes(3L)));
-    final WindowingStrategy<?, ?> windowedStrategy = windowed.getWindowingStrategy();
-    PCollection<KV<String, Long>> keyed = windowed.apply(WithKeys.<String, Long>of("foo"));
-    PCollection<KV<String, Iterable<Long>>> grouped =
-        keyed.apply(GroupByKey.<String, Long>create());
-
-    final RunnerApi.Pipeline pipelineProto = SdkComponents.translatePipeline(pipeline);
-    pipeline.traverseTopologically(
-        new PipelineVisitor.Defaults() {
-          Set<Node> transforms = new HashSet<>();
-          Set<PCollection<?>> pcollections = new HashSet<>();
-          Set<Equivalence.Wrapper<? extends Coder<?>>> coders = new HashSet<>();
-          Set<WindowingStrategy<?, ?>> windowingStrategies = new HashSet<>();
-
-          @Override
-          public void leaveCompositeTransform(Node node) {
-            if (node.isRootNode()) {
-              assertThat(
-                  "Unexpected number of PTransforms",
-                  pipelineProto.getComponents().getTransformsCount(),
-                  equalTo(transforms.size()));
-              assertThat(
-                  "Unexpected number of PCollections",
-                  pipelineProto.getComponents().getPcollectionsCount(),
-                  equalTo(pcollections.size()));
-              assertThat(
-                  "Unexpected number of Coders",
-                  pipelineProto.getComponents().getCodersCount(),
-                  equalTo(coders.size()));
-              assertThat(
-                  "Unexpected number of Windowing Strategies",
-                  pipelineProto.getComponents().getWindowingStrategiesCount(),
-                  equalTo(windowingStrategies.size()));
-            } else {
-              transforms.add(node);
-              if (PTransformTranslation.COMBINE_TRANSFORM_URN.equals(
-                  PTransformTranslation.urnForTransformOrNull(node.getTransform()))) {
-                // Combine translation introduces a coder that is not assigned to any PCollection
-                // in the default expansion, and must be explicitly added here.
-                try {
-                  addCoders(
-                      CombineTranslation.getAccumulatorCoder(
-                          node.toAppliedPTransform(getPipeline())));
-                } catch (IOException e) {
-                  throw new RuntimeException(e);
-                }
-              }
-            }
-          }
-
-          @Override
-          public void visitPrimitiveTransform(Node node) {
-            transforms.add(node);
-          }
-
-          @Override
-          public void visitValue(PValue value, Node producer) {
-            if (value instanceof PCollection) {
-              PCollection pc = (PCollection) value;
-              pcollections.add(pc);
-              addCoders(pc.getCoder());
-              windowingStrategies.add(pc.getWindowingStrategy());
-              addCoders(pc.getWindowingStrategy().getWindowFn().windowCoder());
-            }
-          }
-
-          private void addCoders(Coder<?> coder) {
-            coders.add(Equivalence.<Coder<?>>identity().wrap(coder));
-            if (coder instanceof StructuredCoder) {
-              for (Coder<?> component : ((StructuredCoder<?>) coder).getComponents()) {
-                addCoders(component);
-              }
-            }
-          }
-        });
-  }
-
-  @Test
   public void registerCoder() throws IOException {
     Coder<?> coder =
         KvCoder.of(StringUtf8Coder.of(), IterableCoder.of(SetCoder.of(ByteArrayCoder.of())));

http://git-wip-us.apache.org/repos/asf/beam/blob/43481595/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
----------------------------------------------------------------------
diff --git a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
index 711da2a..0c433fa 100644
--- a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
+++ b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
@@ -92,7 +92,9 @@ message Pipeline {
   // this pipeline.
   Components components = 1;
 
-  // (Required) The ids of all PTransforms that are not contained within another PTransform
+  // (Required) The ids of all PTransforms that are not contained within another PTransform.
+  // These must be in shallow topological order, so that traversing them recursively
+  // in this order yields a recursively topological traversal.
   repeated string root_transform_ids = 2;
 
   // (Optional) Static display data for the pipeline. If there is none,

http://git-wip-us.apache.org/repos/asf/beam/blob/43481595/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
index bdf8a12..760efb3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
@@ -180,6 +180,12 @@ public class Pipeline {
     return begin().apply(name, root);
   }
 
+  @Internal
+  public static Pipeline forTransformHierarchy(
+      TransformHierarchy transforms, PipelineOptions options) {
+    return new Pipeline(transforms, options);
+  }
+
   /**
    * <b><i>For internal use only; no backwards-compatibility guarantees.</i></b>
    *
@@ -476,16 +482,21 @@ public class Pipeline {
   /////////////////////////////////////////////////////////////////////////////
   // Below here are internal operations, never called by users.
 
-  private final TransformHierarchy transforms = new TransformHierarchy();
+  private final TransformHierarchy transforms;
   private Set<String> usedFullNames = new HashSet<>();
   private CoderRegistry coderRegistry;
   private final List<String> unstableNames = new ArrayList<>();
   private final PipelineOptions defaultOptions;
 
-  protected Pipeline(PipelineOptions options) {
+  private Pipeline(TransformHierarchy transforms, PipelineOptions options) {
+    this.transforms = transforms;
     this.defaultOptions = options;
   }
 
+  protected Pipeline(PipelineOptions options) {
+    this(new TransformHierarchy(), options);
+  }
+
   @Override
   public String toString() {
     return "Pipeline#" + hashCode();

http://git-wip-us.apache.org/repos/asf/beam/blob/43481595/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
index d8ff59e..c2d5771 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
@@ -34,6 +34,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.Pipeline.PipelineVisitor;
 import org.apache.beam.sdk.Pipeline.PipelineVisitor.CompositeBehavior;
@@ -98,6 +99,48 @@ public class TransformHierarchy {
     return current;
   }
 
+  @Internal
+  public Node pushFinalizedNode(
+      String name,
+      Map<TupleTag<?>, PValue> inputs,
+      PTransform<?, ?> transform,
+      Map<TupleTag<?>, PValue> outputs) {
+    checkNotNull(
+        transform, "A %s must be provided for all Nodes", PTransform.class.getSimpleName());
+    checkNotNull(
+        name, "A name must be provided for all %s Nodes", PTransform.class.getSimpleName());
+    checkNotNull(
+        inputs, "An input must be provided for all %s Nodes", PTransform.class.getSimpleName());
+    Node node = new Node(current, transform, name, inputs, outputs);
+    node.finishedSpecifying = true;
+    current.addComposite(node);
+    current = node;
+    return current;
+  }
+
+  @Internal
+  public Node addFinalizedPrimitiveNode(
+      String name,
+      Map<TupleTag<?>, PValue> inputs,
+      PTransform<?, ?> transform,
+      Map<TupleTag<?>, PValue> outputs) {
+    checkNotNull(
+        transform, "A %s must be provided for all Nodes", PTransform.class.getSimpleName());
+    checkNotNull(
+        name, "A name must be provided for all %s Nodes", PTransform.class.getSimpleName());
+    checkNotNull(
+        inputs, "Inputs must be provided for all %s Nodes", PTransform.class.getSimpleName());
+    checkNotNull(
+        outputs, "Outputs must be provided for all %s Nodes", PTransform.class.getSimpleName());
+    Node node = new Node(current, transform, name, inputs, outputs);
+    node.finishedSpecifying = true;
+    for (PValue output : outputs.values()) {
+      producers.put(output, node);
+    }
+    current.addComposite(node);
+    return node;
+  }
+
   public Node replaceNode(Node existing, PInput input, PTransform<?, ?> transform) {
     checkNotNull(existing);
     checkNotNull(input);
@@ -321,6 +364,32 @@ public class TransformHierarchy {
     }
 
     /**
+     * Creates a new {@link Node} with the given parent and transform, where inputs and outputs
+     * are already known.
+     *
+     * <p>EnclosingNode and transform may both be null for a root-level node, which holds all other
+     * nodes.
+     *
+     * @param enclosingNode the composite node containing this node
+     * @param transform the PTransform tracked by this node
+     * @param fullName the fully qualified name of the transform
+     * @param inputs the expanded inputs to the transform
+     * @param outputs the expanded outputs of the transform
+     */
+    private Node(
+        @Nullable Node enclosingNode,
+        @Nullable PTransform<?, ?> transform,
+        String fullName,
+        @Nullable Map<TupleTag<?>, PValue> inputs,
+        @Nullable Map<TupleTag<?>, PValue> outputs) {
+      this.enclosingNode = enclosingNode;
+      this.transform = transform;
+      this.fullName = fullName;
+      this.inputs = inputs == null ? Collections.<TupleTag<?>, PValue>emptyMap() : inputs;
+      this.outputs = outputs == null ? Collections.<TupleTag<?>, PValue>emptyMap() : outputs;
+    }
+
+    /**
      * Returns the transform associated with this transform node.
      */
     public PTransform<?, ?> getTransform() {