You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/07/25 02:21:43 UTC

[1/5] beam git commit: Fix tests that passed invalid input to DynamicDestinations

Repository: beam
Updated Branches:
  refs/heads/master 0064fb37a -> 01408c864


Fix tests that passed invalid input to DynamicDestinations


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/12c277f3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/12c277f3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/12c277f3

Branch: refs/heads/master
Commit: 12c277f31b2b8a295e0a41cab3290943a3eff7cd
Parents: efe2dc1
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Jul 21 20:32:20 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Jul 24 18:53:25 2017 -0700

----------------------------------------------------------------------
 .../construction/PTransformMatchersTest.java    | 25 +++++++++++++++++-
 .../direct/WriteWithShardingFactoryTest.java    | 27 +++++++++++++++++++-
 .../runners/dataflow/DataflowRunnerTest.java    | 24 ++++++++++++++++-
 .../beam/sdk/io/DynamicFileDestinations.java    |  6 ++++-
 4 files changed, 78 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/12c277f3/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
index 99d3dd1..316645b 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
@@ -27,6 +27,7 @@ import com.google.common.base.MoreObjects;
 import com.google.common.collect.ImmutableMap;
 import java.io.Serializable;
 import java.util.Collections;
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
@@ -62,7 +63,9 @@ import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
 import org.apache.beam.sdk.transforms.ViewFn;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
@@ -547,7 +550,8 @@ public class PTransformMatchersTest implements Serializable {
     WriteFiles<Integer, Void, Integer> write =
         WriteFiles.to(
             new FileBasedSink<Integer, Void>(
-                StaticValueProvider.of(outputDirectory), DynamicFileDestinations.constant(null)) {
+                StaticValueProvider.of(outputDirectory),
+                DynamicFileDestinations.constant(new FakeFilenamePolicy())) {
               @Override
               public WriteOperation<Integer, Void> createWriteOperation() {
                 return null;
@@ -580,4 +584,23 @@ public class PTransformMatchersTest implements Serializable {
         write,
         p);
   }
+
+  private static class FakeFilenamePolicy extends FilenamePolicy {
+    @Override
+    public ResourceId windowedFilename(
+        int shardNumber,
+        int numShards,
+        BoundedWindow window,
+        PaneInfo paneInfo,
+        FileBasedSink.OutputFileHints outputFileHints) {
+      throw new UnsupportedOperationException("should not be called");
+    }
+
+    @Nullable
+    @Override
+    public ResourceId unwindowedFilename(
+        int shardNumber, int numShards, FileBasedSink.OutputFileHints outputFileHints) {
+      throw new UnsupportedOperationException("should not be called");
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/12c277f3/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
index 546a181..6dd069c 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
@@ -36,6 +36,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
+import javax.annotation.Nullable;
 import org.apache.beam.runners.direct.WriteWithShardingFactory.CalculateShardsFn;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
@@ -55,7 +56,9 @@ import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFnTester;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.SerializableFunctions;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PCollectionViews;
@@ -141,7 +144,8 @@ public class WriteWithShardingFactoryTest implements Serializable {
     PTransform<PCollection<Object>, PDone> original =
         WriteFiles.to(
             new FileBasedSink<Object, Void>(
-                StaticValueProvider.of(outputDirectory), DynamicFileDestinations.constant(null)) {
+                StaticValueProvider.of(outputDirectory),
+                DynamicFileDestinations.constant(new FakeFilenamePolicy())) {
               @Override
               public WriteOperation<Object, Void> createWriteOperation() {
                 throw new IllegalArgumentException("Should not be used");
@@ -234,4 +238,25 @@ public class WriteWithShardingFactoryTest implements Serializable {
     List<Integer> shards = fnTester.processBundle((long) count);
     assertThat(shards, containsInAnyOrder(13));
   }
+
+  private static class FakeFilenamePolicy extends FileBasedSink.FilenamePolicy {
+    @Override
+    public ResourceId windowedFilename(
+        int shardNumber,
+        int numShards,
+        BoundedWindow window,
+        PaneInfo paneInfo,
+        FileBasedSink.OutputFileHints outputFileHints) {
+      throw new IllegalArgumentException("Should not be used");
+    }
+
+    @Nullable
+    @Override
+    public ResourceId unwindowedFilename(
+        int shardNumber,
+        int numShards,
+        FileBasedSink.OutputFileHints outputFileHints) {
+      throw new IllegalArgumentException("Should not be used");
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/12c277f3/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index 94985f8..7556a28 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -64,6 +64,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.regex.Pattern;
+import javax.annotation.Nullable;
 import org.apache.beam.runners.dataflow.DataflowRunner.StreamingShardedWriteFactory;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
@@ -81,6 +82,7 @@ import org.apache.beam.sdk.io.FileBasedSink;
 import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.io.WriteFiles;
+import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -102,6 +104,8 @@ import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunctions;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.Sessions;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.GcsUtil;
@@ -1293,7 +1297,25 @@ public class DataflowRunnerTest implements Serializable {
     TestSink(String tmpFolder) {
       super(
           StaticValueProvider.of(FileSystems.matchNewResource(tmpFolder, true)),
-          DynamicFileDestinations.constant(null));
+          DynamicFileDestinations.constant(
+              new FilenamePolicy() {
+                @Override
+                public ResourceId windowedFilename(
+                    int shardNumber,
+                    int numShards,
+                    BoundedWindow window,
+                    PaneInfo paneInfo,
+                    OutputFileHints outputFileHints) {
+                  throw new UnsupportedOperationException("should not be called");
+                }
+
+                @Nullable
+                @Override
+                public ResourceId unwindowedFilename(
+                    int shardNumber, int numShards, OutputFileHints outputFileHints) {
+                  throw new UnsupportedOperationException("should not be called");
+                }
+              }));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/12c277f3/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicFileDestinations.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicFileDestinations.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicFileDestinations.java
index e7ef0f6..d05a01a7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicFileDestinations.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicFileDestinations.java
@@ -18,6 +18,9 @@
 
 package org.apache.beam.sdk.io;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.io.DefaultFilenamePolicy.Params;
@@ -34,7 +37,7 @@ public class DynamicFileDestinations {
     private final FilenamePolicy filenamePolicy;
 
     public ConstantFilenamePolicy(FilenamePolicy filenamePolicy) {
-      this.filenamePolicy = filenamePolicy;
+      this.filenamePolicy = checkNotNull(filenamePolicy);
     }
 
     @Override
@@ -59,6 +62,7 @@ public class DynamicFileDestinations {
 
     @Override
     public void populateDisplayData(DisplayData.Builder builder) {
+      checkState(filenamePolicy != null);
       filenamePolicy.populateDisplayData(builder);
     }
   }

[5/5] beam git commit: This closes #3334: [BEAM-2333] Go to proto and back before running a pipeline in Java DirectRunner

Posted by ke...@apache.org.
This closes #3334: [BEAM-2333] Go to proto and back before running a pipeline in Java DirectRunner

  Dehydrate then rehydrate Pipeline before DirectRunner.run()
  Add Pipeline rehydration from proto
  Fix tests that passed invalid input to DynamicDestinations
  Add stub DisplayDataTranslation


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/01408c86
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/01408c86
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/01408c86

Branch: refs/heads/master
Commit: 01408c864e9d844f4ffb74cc3f18276ff6a5c447
Parents: 0064fb3 8ca4591
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Jul 24 18:59:57 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Jul 24 18:59:57 2017 -0700

----------------------------------------------------------------------
 .../construction/DisplayDataTranslation.java    |  39 +++
 .../construction/PTransformTranslation.java     |  12 +-
 .../core/construction/PipelineTranslation.java  | 280 +++++++++++++++++++
 .../core/construction/RehydratedComponents.java |   3 +-
 .../core/construction/SdkComponents.java        |  52 ----
 .../construction/PTransformMatchersTest.java    |  25 +-
 .../construction/PipelineTranslationTest.java   | 199 +++++++++++++
 .../core/construction/SdkComponentsTest.java    | 107 -------
 .../beam/runners/direct/DirectRunner.java       |  11 +-
 .../runners/direct/ViewOverrideFactoryTest.java |  41 ---
 .../direct/WriteWithShardingFactoryTest.java    |  27 +-
 .../runners/dataflow/DataflowRunnerTest.java    |  24 +-
 .../src/main/proto/beam_runner_api.proto        |   4 +-
 .../main/java/org/apache/beam/sdk/Pipeline.java |  15 +-
 .../beam/sdk/io/DynamicFileDestinations.java    |   6 +-
 .../beam/sdk/runners/TransformHierarchy.java    |  69 +++++
 16 files changed, 704 insertions(+), 210 deletions(-)
----------------------------------------------------------------------


[3/5] beam git commit: Add Pipeline rehydration from proto

Posted by ke...@apache.org.
Add Pipeline rehydration from proto


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/43481595
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/43481595
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/43481595

Branch: refs/heads/master
Commit: 43481595ebc854f4a7188609fd53267497e68124
Parents: 12c277f
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri May 26 11:22:50 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Jul 24 18:53:26 2017 -0700

----------------------------------------------------------------------
 .../construction/PTransformTranslation.java     |   8 +
 .../core/construction/PipelineTranslation.java  | 280 +++++++++++++++++++
 .../core/construction/RehydratedComponents.java |   3 +-
 .../core/construction/SdkComponents.java        |  52 ----
 .../construction/PipelineTranslationTest.java   | 199 +++++++++++++
 .../core/construction/SdkComponentsTest.java    | 107 -------
 .../src/main/proto/beam_runner_api.proto        |   4 +-
 .../main/java/org/apache/beam/sdk/Pipeline.java |  15 +-
 .../beam/sdk/runners/TransformHierarchy.java    |  69 +++++
 9 files changed, 574 insertions(+), 163 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/43481595/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
index d459645..b8365c9 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
@@ -92,6 +92,7 @@ public class PTransformTranslation {
       List<AppliedPTransform<?, ?, ?>> subtransforms,
       SdkComponents components)
       throws IOException {
+    // TODO include DisplayData https://issues.apache.org/jira/browse/BEAM-2645
     RunnerApi.PTransform.Builder transformBuilder = RunnerApi.PTransform.newBuilder();
     for (Map.Entry<TupleTag<?>, PValue> taggedInput : appliedPTransform.getInputs().entrySet()) {
       checkArgument(
@@ -136,6 +137,7 @@ public class PTransformTranslation {
         }
         transformBuilder.setSpec(payload);
       }
+      rawPTransform.registerComponents(components);
     } else if (KNOWN_PAYLOAD_TRANSLATORS.containsKey(transform.getClass())) {
       FunctionSpec payload =
           KNOWN_PAYLOAD_TRANSLATORS
@@ -225,6 +227,8 @@ public class PTransformTranslation {
     public Any getPayload() {
       return null;
     }
+
+    public void registerComponents(SdkComponents components) {}
   }
 
   /**
@@ -255,6 +259,10 @@ public class PTransformTranslation {
         transformSpec.setParameter(payload);
       }
 
+      // Transforms like Combine may have Coders that need to be added but do not
+      // occur in a black-box traversal
+      transform.getTransform().registerComponents(components);
+
       return transformSpec.build();
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/43481595/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
new file mode 100644
index 0000000..9e4839a
--- /dev/null
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
+import com.google.protobuf.Any;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.core.construction.PTransformTranslation.RawPTransform;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.Pipeline.PipelineVisitor;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.runners.TransformHierarchy;
+import org.apache.beam.sdk.runners.TransformHierarchy.Node;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PCollectionViews;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+
+/** Utilities for going to/from Runner API pipelines. */
+public class PipelineTranslation {
+
+  public static RunnerApi.Pipeline toProto(final Pipeline pipeline) {
+    final SdkComponents components = SdkComponents.create();
+    final Collection<String> rootIds = new HashSet<>();
+    pipeline.traverseTopologically(
+        new PipelineVisitor.Defaults() {
+          private final ListMultimap<Node, AppliedPTransform<?, ?, ?>> children =
+              ArrayListMultimap.create();
+
+          @Override
+          public void leaveCompositeTransform(Node node) {
+            if (node.isRootNode()) {
+              for (AppliedPTransform<?, ?, ?> pipelineRoot : children.get(node)) {
+                rootIds.add(components.getExistingPTransformId(pipelineRoot));
+              }
+            } else {
+              // TODO: Include DisplayData in the proto
+              children.put(node.getEnclosingNode(), node.toAppliedPTransform(pipeline));
+              try {
+                components.registerPTransform(
+                    node.toAppliedPTransform(pipeline), children.get(node));
+              } catch (IOException e) {
+                throw new RuntimeException(e);
+              }
+            }
+          }
+
+          @Override
+          public void visitPrimitiveTransform(Node node) {
+            // TODO: Include DisplayData in the proto
+            children.put(node.getEnclosingNode(), node.toAppliedPTransform(pipeline));
+            try {
+              components.registerPTransform(
+                  node.toAppliedPTransform(pipeline),
+                  Collections.<AppliedPTransform<?, ?, ?>>emptyList());
+            } catch (IOException e) {
+              throw new IllegalStateException(e);
+            }
+          }
+        });
+    return RunnerApi.Pipeline.newBuilder()
+        .setComponents(components.toComponents())
+        .addAllRootTransformIds(rootIds)
+        .build();
+  }
+
+  private static DisplayData evaluateDisplayData(HasDisplayData component) {
+    return DisplayData.from(component);
+  }
+
+  public static Pipeline fromProto(final RunnerApi.Pipeline pipelineProto)
+      throws IOException {
+    TransformHierarchy transforms = new TransformHierarchy();
+    Pipeline pipeline = Pipeline.forTransformHierarchy(transforms, PipelineOptionsFactory.create());
+
+    // Keeping the PCollections straight is a semantic necessity, but being careful not to explode
+    // the number of coders and windowing strategies is also nice, and helps testing.
+    RehydratedComponents rehydratedComponents =
+        RehydratedComponents.forComponents(pipelineProto.getComponents()).withPipeline(pipeline);
+
+    for (String rootId : pipelineProto.getRootTransformIdsList()) {
+      addRehydratedTransform(
+          transforms,
+          pipelineProto.getComponents().getTransformsOrThrow(rootId),
+          pipeline,
+          pipelineProto.getComponents().getTransformsMap(),
+          rehydratedComponents);
+    }
+
+    return pipeline;
+  }
+
+  private static void addRehydratedTransform(
+      TransformHierarchy transforms,
+      RunnerApi.PTransform transformProto,
+      Pipeline pipeline,
+      Map<String, RunnerApi.PTransform> transformProtos,
+      RehydratedComponents rehydratedComponents)
+      throws IOException {
+
+    Map<TupleTag<?>, PValue> rehydratedInputs = new HashMap<>();
+    for (Map.Entry<String, String> inputEntry : transformProto.getInputsMap().entrySet()) {
+      rehydratedInputs.put(
+          new TupleTag<>(inputEntry.getKey()),
+          rehydratedComponents.getPCollection(inputEntry.getValue()));
+    }
+
+    Map<TupleTag<?>, PValue> rehydratedOutputs = new HashMap<>();
+    for (Map.Entry<String, String> outputEntry : transformProto.getOutputsMap().entrySet()) {
+      rehydratedOutputs.put(
+          new TupleTag<>(outputEntry.getKey()),
+          rehydratedComponents.getPCollection(outputEntry.getValue()));
+    }
+
+    RunnerApi.FunctionSpec transformSpec = transformProto.getSpec();
+
+    // By default, no "additional" inputs, since that is an SDK-specific thing.
+    // Only ParDo really separates main from side inputs
+    Map<TupleTag<?>, PValue> additionalInputs = Collections.emptyMap();
+
+    // TODO: ParDoTranslator should own it - https://issues.apache.org/jira/browse/BEAM-2674
+    if (transformSpec.getUrn().equals(PTransformTranslation.PAR_DO_TRANSFORM_URN)) {
+      RunnerApi.ParDoPayload payload =
+          transformSpec.getParameter().unpack(RunnerApi.ParDoPayload.class);
+
+      List<PCollectionView<?>> views = new ArrayList<>();
+      for (Map.Entry<String, RunnerApi.SideInput> sideInputEntry :
+          payload.getSideInputsMap().entrySet()) {
+        String localName = sideInputEntry.getKey();
+        RunnerApi.SideInput sideInput = sideInputEntry.getValue();
+        PCollection<?> pCollection =
+            (PCollection<?>) checkNotNull(rehydratedInputs.get(new TupleTag<>(localName)));
+        views.add(
+            ParDoTranslation.viewFromProto(
+                sideInputEntry.getValue(),
+                sideInputEntry.getKey(),
+                pCollection,
+                transformProto,
+                rehydratedComponents));
+      }
+      additionalInputs = PCollectionViews.toAdditionalInputs(views);
+    }
+
+    // TODO: CombineTranslator should own it - https://issues.apache.org/jira/browse/BEAM-2674
+    List<Coder<?>> additionalCoders = Collections.emptyList();
+    if (transformSpec.getUrn().equals(PTransformTranslation.COMBINE_TRANSFORM_URN)) {
+      RunnerApi.CombinePayload payload =
+          transformSpec.getParameter().unpack(RunnerApi.CombinePayload.class);
+      additionalCoders =
+          (List)
+              Collections.singletonList(
+                  rehydratedComponents.getCoder(payload.getAccumulatorCoderId()));
+    }
+
+    RehydratedPTransform transform =
+        RehydratedPTransform.of(
+            transformSpec.getUrn(),
+            transformSpec.getParameter(),
+            additionalInputs,
+            additionalCoders);
+
+    if (isPrimitive(transformProto)) {
+      transforms.addFinalizedPrimitiveNode(
+          transformProto.getUniqueName(), rehydratedInputs, transform, rehydratedOutputs);
+    } else {
+      transforms.pushFinalizedNode(
+          transformProto.getUniqueName(), rehydratedInputs, transform, rehydratedOutputs);
+
+      for (String childTransformId : transformProto.getSubtransformsList()) {
+        addRehydratedTransform(
+            transforms,
+            transformProtos.get(childTransformId),
+            pipeline,
+            transformProtos,
+            rehydratedComponents);
+      }
+
+      transforms.popNode();
+    }
+  }
+
+  // A primitive transform is one with outputs that are not in its input and also
+  // not produced by a subtransform.
+  private static boolean isPrimitive(RunnerApi.PTransform transformProto) {
+    return transformProto.getSubtransformsCount() == 0
+        && !transformProto
+        .getInputsMap()
+        .values()
+        .containsAll(transformProto.getOutputsMap().values());
+  }
+
+  @AutoValue
+  abstract static class RehydratedPTransform extends RawPTransform<PInput, POutput> {
+
+    @Nullable
+    public abstract String getUrn();
+
+    @Nullable
+    public abstract Any getPayload();
+
+    @Override
+    public abstract Map<TupleTag<?>, PValue> getAdditionalInputs();
+
+    public abstract List<Coder<?>> getCoders();
+
+    public static RehydratedPTransform of(
+        String urn,
+        Any payload,
+        Map<TupleTag<?>, PValue> additionalInputs,
+        List<Coder<?>> additionalCoders) {
+      return new AutoValue_PipelineTranslation_RehydratedPTransform(
+          urn, payload, additionalInputs, additionalCoders);
+    }
+
+    @Override
+    public POutput expand(PInput input) {
+      throw new IllegalStateException(
+          String.format(
+              "%s should never be asked to expand;"
+                  + " it is the result of deserializing an already-constructed Pipeline",
+              getClass().getSimpleName()));
+    }
+
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(this)
+          .add("urn", getUrn())
+          .add("payload", getPayload())
+          .toString();
+    }
+
+    @Override
+    public void registerComponents(SdkComponents components) {
+      for (Coder<?> coder : getCoders()) {
+        try {
+          components.registerCoder(coder);
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/43481595/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RehydratedComponents.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RehydratedComponents.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RehydratedComponents.java
index a9a34d7..ccdd4a7 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RehydratedComponents.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RehydratedComponents.java
@@ -93,7 +93,8 @@ public class RehydratedComponents {
                       PCollection.class.getSimpleName(),
                       Pipeline.class.getSimpleName());
                   return PCollectionTranslation.fromProto(
-                      components.getPcollectionsOrThrow(id), pipeline, RehydratedComponents.this);
+                      components.getPcollectionsOrThrow(id), pipeline, RehydratedComponents.this)
+                      .setName(id);
                 }
               });
 

http://git-wip-us.apache.org/repos/asf/beam/blob/43481595/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
index 0d3ba60..54d2e9d 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
@@ -22,24 +22,16 @@ import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.common.base.Equivalence;
-import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.BiMap;
 import com.google.common.collect.HashBiMap;
-import com.google.common.collect.ListMultimap;
 import java.io.IOException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.Pipeline.PipelineVisitor;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components;
 import org.apache.beam.sdk.runners.AppliedPTransform;
-import org.apache.beam.sdk.runners.TransformHierarchy.Node;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.util.NameUtils;
 import org.apache.beam.sdk.values.PCollection;
@@ -62,50 +54,6 @@ public class SdkComponents {
     return new SdkComponents();
   }
 
-  public static RunnerApi.Pipeline translatePipeline(Pipeline pipeline) {
-    final SdkComponents components = create();
-    final Collection<String> rootIds = new HashSet<>();
-    pipeline.traverseTopologically(
-        new PipelineVisitor.Defaults() {
-          private final ListMultimap<Node, AppliedPTransform<?, ?, ?>> children =
-              ArrayListMultimap.create();
-
-          @Override
-          public void leaveCompositeTransform(Node node) {
-            if (node.isRootNode()) {
-              for (AppliedPTransform<?, ?, ?> pipelineRoot : children.get(node)) {
-                rootIds.add(components.getExistingPTransformId(pipelineRoot));
-              }
-            } else {
-              children.put(node.getEnclosingNode(), node.toAppliedPTransform(getPipeline()));
-              try {
-                components.registerPTransform(
-                    node.toAppliedPTransform(getPipeline()), children.get(node));
-              } catch (IOException e) {
-                throw new RuntimeException(e);
-              }
-            }
-          }
-
-          @Override
-          public void visitPrimitiveTransform(Node node) {
-            children.put(node.getEnclosingNode(), node.toAppliedPTransform(getPipeline()));
-            try {
-              components.registerPTransform(
-                  node.toAppliedPTransform(getPipeline()),
-                  Collections.<AppliedPTransform<?, ?, ?>>emptyList());
-            } catch (IOException e) {
-              throw new IllegalStateException(e);
-            }
-          }
-        });
-    // TODO: Display Data
-    return RunnerApi.Pipeline.newBuilder()
-        .setComponents(components.toComponents())
-        .addAllRootTransformIds(rootIds)
-        .build();
-  }
-
   private SdkComponents() {
     this.componentsBuilder = RunnerApi.Components.newBuilder();
     this.transformIds = HashBiMap.create();

http://git-wip-us.apache.org/repos/asf/beam/blob/43481595/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PipelineTranslationTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PipelineTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PipelineTranslationTest.java
new file mode 100644
index 0000000..9e6dff4
--- /dev/null
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PipelineTranslationTest.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.base.Equivalence;
+import com.google.common.collect.ImmutableList;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.Pipeline.PipelineVisitor;
+import org.apache.beam.sdk.coders.BigEndianLongCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StructuredCoder;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
+import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.runners.TransformHierarchy.Node;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.joda.time.Duration;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+/** Tests for {@link PipelineTranslation}. */
+@RunWith(Parameterized.class)
+public class PipelineTranslationTest {
+  @Parameter(0)
+  public Pipeline pipeline;
+
+  @Parameters(name = "{index}")
+  public static Iterable<Pipeline> testPipelines() {
+    Pipeline trivialPipeline = Pipeline.create();
+    trivialPipeline.apply(Create.of(1, 2, 3));
+
+    Pipeline sideInputPipeline = Pipeline.create();
+    final PCollectionView<String> singletonView =
+        sideInputPipeline.apply(Create.of("foo")).apply(View.<String>asSingleton());
+    sideInputPipeline
+        .apply(Create.of("main input"))
+        .apply(
+            ParDo.of(
+                    new DoFn<String, String>() {
+                      @ProcessElement
+                      public void process(ProcessContext c) {
+                        // actually never executed and no effect on translation
+                        c.sideInput(singletonView);
+                      }
+                    })
+                .withSideInputs(singletonView));
+
+    Pipeline complexPipeline = Pipeline.create();
+    BigEndianLongCoder customCoder = BigEndianLongCoder.of();
+    PCollection<Long> elems = complexPipeline.apply(GenerateSequence.from(0L).to(207L));
+    PCollection<Long> counted = elems.apply(Count.<Long>globally()).setCoder(customCoder);
+    PCollection<Long> windowed =
+        counted.apply(
+            Window.<Long>into(FixedWindows.of(Duration.standardMinutes(7)))
+                .triggering(
+                    AfterWatermark.pastEndOfWindow()
+                        .withEarlyFirings(AfterPane.elementCountAtLeast(19)))
+                .accumulatingFiredPanes()
+                .withAllowedLateness(Duration.standardMinutes(3L)));
+    final WindowingStrategy<?, ?> windowedStrategy = windowed.getWindowingStrategy();
+    PCollection<KV<String, Long>> keyed = windowed.apply(WithKeys.<String, Long>of("foo"));
+    PCollection<KV<String, Iterable<Long>>> grouped =
+        keyed.apply(GroupByKey.<String, Long>create());
+
+    return ImmutableList.of(trivialPipeline, sideInputPipeline, complexPipeline);
+  }
+
+  @Test
+  public void testProtoDirectly() {
+    final RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline);
+    pipeline.traverseTopologically(
+        new PipelineProtoVerificationVisitor(pipelineProto));
+  }
+
+  @Test
+  public void testProtoAgainstRehydrated() throws Exception {
+    RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline);
+    Pipeline rehydrated = PipelineTranslation.fromProto(pipelineProto);
+
+    rehydrated.traverseTopologically(
+        new PipelineProtoVerificationVisitor(pipelineProto));
+  }
+
+  private static class PipelineProtoVerificationVisitor extends PipelineVisitor.Defaults {
+
+    private final RunnerApi.Pipeline pipelineProto;
+    Set<Node> transforms;
+    Set<PCollection<?>> pcollections;
+    Set<Equivalence.Wrapper<? extends Coder<?>>> coders;
+    Set<WindowingStrategy<?, ?>> windowingStrategies;
+
+    public PipelineProtoVerificationVisitor(RunnerApi.Pipeline pipelineProto) {
+      this.pipelineProto = pipelineProto;
+      transforms = new HashSet<>();
+      pcollections = new HashSet<>();
+      coders = new HashSet<>();
+      windowingStrategies = new HashSet<>();
+    }
+
+    @Override
+    public void leaveCompositeTransform(Node node) {
+      if (node.isRootNode()) {
+        assertThat(
+            "Unexpected number of PTransforms",
+            pipelineProto.getComponents().getTransformsCount(),
+            equalTo(transforms.size()));
+        assertThat(
+            "Unexpected number of PCollections",
+            pipelineProto.getComponents().getPcollectionsCount(),
+            equalTo(pcollections.size()));
+        assertThat(
+            "Unexpected number of Coders",
+            pipelineProto.getComponents().getCodersCount(),
+            equalTo(coders.size()));
+        assertThat(
+            "Unexpected number of Windowing Strategies",
+            pipelineProto.getComponents().getWindowingStrategiesCount(),
+            equalTo(windowingStrategies.size()));
+      } else {
+        transforms.add(node);
+        if (PTransformTranslation.COMBINE_TRANSFORM_URN.equals(
+            PTransformTranslation.urnForTransformOrNull(node.getTransform()))) {
+          // Combine translation introduces a coder that is not assigned to any PCollection
+          // in the default expansion, and must be explicitly added here.
+          try {
+            addCoders(
+                CombineTranslation.getAccumulatorCoder(node.toAppliedPTransform(getPipeline())));
+          } catch (IOException e) {
+            throw new RuntimeException(e);
+          }
+        }
+      }
+    }
+
+    @Override
+    public void visitPrimitiveTransform(Node node) {
+      transforms.add(node);
+    }
+
+    @Override
+    public void visitValue(PValue value, Node producer) {
+      if (value instanceof PCollection) {
+        PCollection pc = (PCollection) value;
+        pcollections.add(pc);
+        addCoders(pc.getCoder());
+        windowingStrategies.add(pc.getWindowingStrategy());
+        addCoders(pc.getWindowingStrategy().getWindowFn().windowCoder());
+      }
+    }
+
+    private void addCoders(Coder<?> coder) {
+      coders.add(Equivalence.<Coder<?>>identity().wrap(coder));
+      if (CoderTranslation.KNOWN_CODER_URNS.containsKey(coder.getClass())) {
+        for (Coder<?> component : ((StructuredCoder<?>) coder).getComponents()) {
+          addCoders(component);
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/43481595/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java
index ce6a99f..82840d6 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java
@@ -24,43 +24,25 @@ import static org.hamcrest.Matchers.isEmptyOrNullString;
 import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertThat;
 
-import com.google.common.base.Equivalence;
 import java.io.IOException;
 import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-import org.apache.beam.sdk.Pipeline.PipelineVisitor;
-import org.apache.beam.sdk.coders.BigEndianLongCoder;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.SetCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.StructuredCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components;
 import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.runners.AppliedPTransform;
-import org.apache.beam.sdk.runners.TransformHierarchy.Node;
 import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.WithKeys;
-import org.apache.beam.sdk.transforms.windowing.AfterPane;
-import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.beam.sdk.values.WindowingStrategy.AccumulationMode;
 import org.hamcrest.Matchers;
-import org.joda.time.Duration;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -78,95 +60,6 @@ public class SdkComponentsTest {
   private SdkComponents components = SdkComponents.create();
 
   @Test
-  public void translatePipeline() {
-    BigEndianLongCoder customCoder = BigEndianLongCoder.of();
-    PCollection<Long> elems = pipeline.apply(GenerateSequence.from(0L).to(207L));
-    PCollection<Long> counted = elems.apply(Count.<Long>globally()).setCoder(customCoder);
-    PCollection<Long> windowed =
-        counted.apply(
-            Window.<Long>into(FixedWindows.of(Duration.standardMinutes(7)))
-                .triggering(
-                    AfterWatermark.pastEndOfWindow()
-                        .withEarlyFirings(AfterPane.elementCountAtLeast(19)))
-                .accumulatingFiredPanes()
-                .withAllowedLateness(Duration.standardMinutes(3L)));
-    final WindowingStrategy<?, ?> windowedStrategy = windowed.getWindowingStrategy();
-    PCollection<KV<String, Long>> keyed = windowed.apply(WithKeys.<String, Long>of("foo"));
-    PCollection<KV<String, Iterable<Long>>> grouped =
-        keyed.apply(GroupByKey.<String, Long>create());
-
-    final RunnerApi.Pipeline pipelineProto = SdkComponents.translatePipeline(pipeline);
-    pipeline.traverseTopologically(
-        new PipelineVisitor.Defaults() {
-          Set<Node> transforms = new HashSet<>();
-          Set<PCollection<?>> pcollections = new HashSet<>();
-          Set<Equivalence.Wrapper<? extends Coder<?>>> coders = new HashSet<>();
-          Set<WindowingStrategy<?, ?>> windowingStrategies = new HashSet<>();
-
-          @Override
-          public void leaveCompositeTransform(Node node) {
-            if (node.isRootNode()) {
-              assertThat(
-                  "Unexpected number of PTransforms",
-                  pipelineProto.getComponents().getTransformsCount(),
-                  equalTo(transforms.size()));
-              assertThat(
-                  "Unexpected number of PCollections",
-                  pipelineProto.getComponents().getPcollectionsCount(),
-                  equalTo(pcollections.size()));
-              assertThat(
-                  "Unexpected number of Coders",
-                  pipelineProto.getComponents().getCodersCount(),
-                  equalTo(coders.size()));
-              assertThat(
-                  "Unexpected number of Windowing Strategies",
-                  pipelineProto.getComponents().getWindowingStrategiesCount(),
-                  equalTo(windowingStrategies.size()));
-            } else {
-              transforms.add(node);
-              if (PTransformTranslation.COMBINE_TRANSFORM_URN.equals(
-                  PTransformTranslation.urnForTransformOrNull(node.getTransform()))) {
-                // Combine translation introduces a coder that is not assigned to any PCollection
-                // in the default expansion, and must be explicitly added here.
-                try {
-                  addCoders(
-                      CombineTranslation.getAccumulatorCoder(
-                          node.toAppliedPTransform(getPipeline())));
-                } catch (IOException e) {
-                  throw new RuntimeException(e);
-                }
-              }
-            }
-          }
-
-          @Override
-          public void visitPrimitiveTransform(Node node) {
-            transforms.add(node);
-          }
-
-          @Override
-          public void visitValue(PValue value, Node producer) {
-            if (value instanceof PCollection) {
-              PCollection pc = (PCollection) value;
-              pcollections.add(pc);
-              addCoders(pc.getCoder());
-              windowingStrategies.add(pc.getWindowingStrategy());
-              addCoders(pc.getWindowingStrategy().getWindowFn().windowCoder());
-            }
-          }
-
-          private void addCoders(Coder<?> coder) {
-            coders.add(Equivalence.<Coder<?>>identity().wrap(coder));
-            if (coder instanceof StructuredCoder) {
-              for (Coder<?> component : ((StructuredCoder<?>) coder).getComponents()) {
-                addCoders(component);
-              }
-            }
-          }
-        });
-  }
-
-  @Test
   public void registerCoder() throws IOException {
     Coder<?> coder =
         KvCoder.of(StringUtf8Coder.of(), IterableCoder.of(SetCoder.of(ByteArrayCoder.of())));

http://git-wip-us.apache.org/repos/asf/beam/blob/43481595/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
----------------------------------------------------------------------
diff --git a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
index 711da2a..0c433fa 100644
--- a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
+++ b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
@@ -92,7 +92,9 @@ message Pipeline {
   // this pipeline.
   Components components = 1;
 
-  // (Required) The ids of all PTransforms that are not contained within another PTransform
+  // (Required) The ids of all PTransforms that are not contained within another PTransform.
+  // These must be in shallow topological order, so that traversing them recursively
+  // in this order yields a recursively topological traversal.
   repeated string root_transform_ids = 2;
 
   // (Optional) Static display data for the pipeline. If there is none,

http://git-wip-us.apache.org/repos/asf/beam/blob/43481595/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
index bdf8a12..760efb3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
@@ -180,6 +180,12 @@ public class Pipeline {
     return begin().apply(name, root);
   }
 
+  @Internal
+  public static Pipeline forTransformHierarchy(
+      TransformHierarchy transforms, PipelineOptions options) {
+    return new Pipeline(transforms, options);
+  }
+
   /**
    * <b><i>For internal use only; no backwards-compatibility guarantees.</i></b>
    *
@@ -476,16 +482,21 @@ public class Pipeline {
   /////////////////////////////////////////////////////////////////////////////
   // Below here are internal operations, never called by users.
 
-  private final TransformHierarchy transforms = new TransformHierarchy();
+  private final TransformHierarchy transforms;
   private Set<String> usedFullNames = new HashSet<>();
   private CoderRegistry coderRegistry;
   private final List<String> unstableNames = new ArrayList<>();
   private final PipelineOptions defaultOptions;
 
-  protected Pipeline(PipelineOptions options) {
+  private Pipeline(TransformHierarchy transforms, PipelineOptions options) {
+    this.transforms = transforms;
     this.defaultOptions = options;
   }
 
+  protected Pipeline(PipelineOptions options) {
+    this(new TransformHierarchy(), options);
+  }
+
   @Override
   public String toString() {
     return "Pipeline#" + hashCode();

http://git-wip-us.apache.org/repos/asf/beam/blob/43481595/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
index d8ff59e..c2d5771 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
@@ -34,6 +34,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.Pipeline.PipelineVisitor;
 import org.apache.beam.sdk.Pipeline.PipelineVisitor.CompositeBehavior;
@@ -98,6 +99,48 @@ public class TransformHierarchy {
     return current;
   }
 
+  @Internal
+  public Node pushFinalizedNode(
+      String name,
+      Map<TupleTag<?>, PValue> inputs,
+      PTransform<?, ?> transform,
+      Map<TupleTag<?>, PValue> outputs) {
+    checkNotNull(
+        transform, "A %s must be provided for all Nodes", PTransform.class.getSimpleName());
+    checkNotNull(
+        name, "A name must be provided for all %s Nodes", PTransform.class.getSimpleName());
+    checkNotNull(
+        inputs, "An input must be provided for all %s Nodes", PTransform.class.getSimpleName());
+    Node node = new Node(current, transform, name, inputs, outputs);
+    node.finishedSpecifying = true;
+    current.addComposite(node);
+    current = node;
+    return current;
+  }
+
+  @Internal
+  public Node addFinalizedPrimitiveNode(
+      String name,
+      Map<TupleTag<?>, PValue> inputs,
+      PTransform<?, ?> transform,
+      Map<TupleTag<?>, PValue> outputs) {
+    checkNotNull(
+        transform, "A %s must be provided for all Nodes", PTransform.class.getSimpleName());
+    checkNotNull(
+        name, "A name must be provided for all %s Nodes", PTransform.class.getSimpleName());
+    checkNotNull(
+        inputs, "Inputs must be provided for all %s Nodes", PTransform.class.getSimpleName());
+    checkNotNull(
+        outputs, "Outputs must be provided for all %s Nodes", PTransform.class.getSimpleName());
+    Node node = new Node(current, transform, name, inputs, outputs);
+    node.finishedSpecifying = true;
+    for (PValue output : outputs.values()) {
+      producers.put(output, node);
+    }
+    current.addComposite(node);
+    return node;
+  }
+
   public Node replaceNode(Node existing, PInput input, PTransform<?, ?> transform) {
     checkNotNull(existing);
     checkNotNull(input);
@@ -321,6 +364,32 @@ public class TransformHierarchy {
     }
 
     /**
+     * Creates a new {@link Node} with the given parent and transform, where inputs and outputs
+     * are already known.
+     *
+     * <p>EnclosingNode and transform may both be null for a root-level node, which holds all other
+     * nodes.
+     *
+     * @param enclosingNode the composite node containing this node
+     * @param transform the PTransform tracked by this node
+     * @param fullName the fully qualified name of the transform
+     * @param inputs the expanded inputs to the transform
+     * @param outputs the expanded outputs of the transform
+     */
+    private Node(
+        @Nullable Node enclosingNode,
+        @Nullable PTransform<?, ?> transform,
+        String fullName,
+        @Nullable Map<TupleTag<?>, PValue> inputs,
+        @Nullable Map<TupleTag<?>, PValue> outputs) {
+      this.enclosingNode = enclosingNode;
+      this.transform = transform;
+      this.fullName = fullName;
+      this.inputs = inputs == null ? Collections.<TupleTag<?>, PValue>emptyMap() : inputs;
+      this.outputs = outputs == null ? Collections.<TupleTag<?>, PValue>emptyMap() : outputs;
+    }
+
+    /**
      * Returns the transform associated with this transform node.
      */
     public PTransform<?, ?> getTransform() {


[4/5] beam git commit: Dehydrate then rehydrate Pipeline before DirectRunner.run()

Posted by ke...@apache.org.
Dehydrate then rehydrate Pipeline before DirectRunner.run()


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8ca45915
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8ca45915
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8ca45915

Branch: refs/heads/master
Commit: 8ca459158888693839edb14f824fa6835ebe3e67
Parents: 4348159
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri May 26 11:23:05 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Jul 24 18:53:26 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/direct/DirectRunner.java       | 11 +++++-
 .../runners/direct/ViewOverrideFactoryTest.java | 41 --------------------
 2 files changed, 10 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/8ca45915/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index 4621224..c5f29e5 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -22,6 +22,7 @@ import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
@@ -31,6 +32,7 @@ import java.util.Set;
 import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems;
 import org.apache.beam.runners.core.construction.PTransformMatchers;
 import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.PipelineTranslation;
 import org.apache.beam.runners.core.construction.SplittableParDo;
 import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult;
 import org.apache.beam.runners.direct.TestStreamEvaluatorFactory.DirectTestStreamFactory;
@@ -156,7 +158,14 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> {
   }
 
   @Override
-  public DirectPipelineResult run(Pipeline pipeline) {
+  public DirectPipelineResult run(Pipeline originalPipeline) {
+    Pipeline pipeline;
+    try {
+      pipeline = PipelineTranslation.fromProto(
+          PipelineTranslation.toProto(originalPipeline));
+    } catch (IOException exception) {
+      throw new RuntimeException("Error preparing pipeline for direct execution.", exception);
+    }
     pipeline.replaceAll(defaultTransformOverrides());
     MetricsEnvironment.setMetricsSupported(true);
     DirectGraphVisitor graphVisitor = new DirectGraphVisitor();

http://git-wip-us.apache.org/repos/asf/beam/blob/8ca45915/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java
index 6af9273..94d8d70 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java
@@ -23,22 +23,17 @@ import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertThat;
 
-import com.google.common.collect.ImmutableSet;
 import java.io.Serializable;
 import java.util.List;
-import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.beam.runners.direct.ViewOverrideFactory.WriteView;
 import org.apache.beam.sdk.Pipeline.PipelineVisitor;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory.PTransformReplacement;
 import org.apache.beam.sdk.runners.TransformHierarchy.Node;
-import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
 import org.apache.beam.sdk.transforms.ViewFn;
 import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
@@ -62,42 +57,6 @@ public class ViewOverrideFactoryTest implements Serializable {
       new ViewOverrideFactory<>();
 
   @Test
-  public void replacementSucceeds() {
-    PCollection<Integer> ints = p.apply("CreateContents", Create.of(1, 2, 3));
-    final PCollectionView<List<Integer>> view =
-        PCollectionViews.listView(ints, WindowingStrategy.globalDefault(), ints.getCoder());
-    PTransformReplacement<PCollection<Integer>, PCollection<Integer>>
-        replacementTransform =
-            factory.getReplacementTransform(
-                AppliedPTransform
-                    .<PCollection<Integer>, PCollection<Integer>,
-                        PTransform<PCollection<Integer>, PCollection<Integer>>>
-                        of(
-                            "foo",
-                            ints.expand(),
-                            view.expand(),
-                            CreatePCollectionView.<Integer, List<Integer>>of(view),
-                            p));
-    ints.apply(replacementTransform.getTransform());
-
-    PCollection<Set<Integer>> outputViewContents =
-        p.apply("CreateSingleton", Create.of(0))
-            .apply(
-                "OutputContents",
-                ParDo.of(
-                        new DoFn<Integer, Set<Integer>>() {
-                          @ProcessElement
-                          public void outputSideInput(ProcessContext context) {
-                            context.output(ImmutableSet.copyOf(context.sideInput(view)));
-                          }
-                        })
-                    .withSideInputs(view));
-    PAssert.thatSingleton(outputViewContents).isEqualTo(ImmutableSet.of(1, 2, 3));
-
-    p.run();
-  }
-
-  @Test
   public void replacementGetViewReturnsOriginal() {
     final PCollection<Integer> ints = p.apply("CreateContents", Create.of(1, 2, 3));
     final PCollectionView<List<Integer>> view =


[2/5] beam git commit: Add stub DisplayDataTranslation

Posted by ke...@apache.org.
Add stub DisplayDataTranslation


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/efe2dc17
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/efe2dc17
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/efe2dc17

Branch: refs/heads/master
Commit: efe2dc17af2a66c5c33076467cb46f73bb7fb9ab
Parents: 0064fb3
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Jul 19 20:09:52 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Jul 24 18:53:25 2017 -0700

----------------------------------------------------------------------
 .../construction/DisplayDataTranslation.java    | 39 ++++++++++++++++++++
 .../construction/PTransformTranslation.java     |  4 +-
 2 files changed, 42 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/efe2dc17/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/DisplayDataTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/DisplayDataTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/DisplayDataTranslation.java
new file mode 100644
index 0000000..ff7f9f2
--- /dev/null
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/DisplayDataTranslation.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction;
+
+import com.google.protobuf.Any;
+import com.google.protobuf.BoolValue;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+
+/** Utilities for going to/from DisplayData protos. */
+public class DisplayDataTranslation {
+  public static RunnerApi.DisplayData toProto(DisplayData displayData) {
+    // TODO https://issues.apache.org/jira/browse/BEAM-2645
+    return RunnerApi.DisplayData.newBuilder()
+        .addItems(
+            RunnerApi.DisplayData.Item.newBuilder()
+                .setId(RunnerApi.DisplayData.Identifier.newBuilder().setKey("stubImplementation"))
+                .setLabel("Stub implementation")
+                .setType(RunnerApi.DisplayData.Type.BOOLEAN)
+                .setValue(Any.pack(BoolValue.newBuilder().setValue(true).build())))
+        .build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/efe2dc17/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
index 3b94724..d459645 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
@@ -33,6 +33,7 @@ import org.apache.beam.sdk.common.runner.v1.RunnerApi;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
@@ -118,7 +119,8 @@ public class PTransformTranslation {
     }
 
     transformBuilder.setUniqueName(appliedPTransform.getFullName());
-    // TODO: Display Data
+    transformBuilder.setDisplayData(
+        DisplayDataTranslation.toProto(DisplayData.from(appliedPTransform.getTransform())));
 
     PTransform<?, ?> transform = appliedPTransform.getTransform();
     // A RawPTransform directly vends its payload. Because it will generally be