You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/01/06 20:07:57 UTC

[2/5] beam git commit: Move some pieces of Dataflow translator to top level

Move some pieces of Dataflow translator to top level


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5d2cb3e2
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5d2cb3e2
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5d2cb3e2

Branch: refs/heads/master
Commit: 5d2cb3e2310dbf7046785e9e8f6403b854b2dd03
Parents: f04537c
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Jan 5 16:51:23 2017 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Jan 6 11:36:51 2017 -0800

----------------------------------------------------------------------
 .../dataflow/DataflowPipelineTranslator.java    | 134 +------------------
 .../beam/runners/dataflow/DataflowRunner.java   |   8 +-
 .../runners/dataflow/TransformTranslator.java   | 123 +++++++++++++++++
 .../dataflow/internal/ReadTranslator.java       |   7 +-
 .../DataflowPipelineTranslatorTest.java         |   3 +-
 .../runners/dataflow/DataflowRunnerTest.java    |   5 +-
 6 files changed, 137 insertions(+), 143 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/5d2cb3e2/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index 2385fa1..e9cf6f4 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -56,6 +56,8 @@ import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.dataflow.DataflowRunner.GroupByKeyAndSortValuesOnly;
+import org.apache.beam.runners.dataflow.TransformTranslator.StepTranslationContext;
+import org.apache.beam.runners.dataflow.TransformTranslator.TranslationContext;
 import org.apache.beam.runners.dataflow.internal.ReadTranslator;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.runners.dataflow.util.DoFnInfo;
@@ -69,6 +71,7 @@ import org.apache.beam.sdk.options.StreamingOptions;
 import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Combine.GroupedValues;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
@@ -80,6 +83,7 @@ import org.apache.beam.sdk.transforms.display.HasDisplayData;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
 import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.transforms.windowing.Window.Bound;
 import org.apache.beam.sdk.util.AppliedCombineFn;
 import org.apache.beam.sdk.util.CloudObject;
 import org.apache.beam.sdk.util.PropertyNames;
@@ -212,130 +216,6 @@ public class DataflowPipelineTranslator {
     return transformTranslators.get(transformClass);
   }
 
-  /**
-   * A {@link TransformTranslator} knows how to translate a particular subclass of {@link
-   * PTransform} for the Cloud Dataflow service. It does so by mutating the {@link
-   * TranslationContext}.
-   */
-  public interface TransformTranslator<TransformT extends PTransform> {
-    void translate(TransformT transform, TranslationContext context);
-  }
-
-  /**
-   * The interface provided to registered callbacks for interacting
-   * with the {@link DataflowRunner}, including reading and writing the
-   * values of {@link PCollection}s and side inputs ({@link PCollectionView}s).
-   */
-  public interface TranslationContext {
-    /**
-     * Returns the configured pipeline options.
-     */
-    DataflowPipelineOptions getPipelineOptions();
-
-    /**
-     * Returns the input of the currently being translated transform.
-     */
-    <InputT extends PInput> InputT getInput(PTransform<InputT, ?> transform);
-
-    /**
-     * Returns the output of the currently being translated transform.
-     */
-    <OutputT extends POutput> OutputT getOutput(PTransform<?, OutputT> transform);
-
-    /**
-     * Returns the full name of the currently being translated transform.
-     */
-    String getFullName(PTransform<?, ?> transform);
-
-    /**
-     * Adds a step to the Dataflow workflow for the given transform, with
-     * the given Dataflow step type.
-     */
-    StepTranslationContext addStep(PTransform<?, ?> transform, String type);
-
-    /**
-     * Adds a pre-defined step to the Dataflow workflow. The given PTransform should be
-     * consistent with the Step, in terms of input, output and coder types.
-     *
-     * <p>This is a low-level operation, when using this method it is up to
-     * the caller to ensure that names do not collide.
-     */
-    Step addStep(PTransform<?, ? extends PValue> transform, Step step);
-    /**
-     * Encode a PValue reference as an output reference.
-     */
-    OutputReference asOutputReference(PValue value);
-  }
-
-  public interface StepTranslationContext {
-    /**
-     * Sets the encoding for the current Dataflow step.
-     */
-    void addEncodingInput(Coder<?> value);
-
-    /**
-     * Adds an input with the given name and value to the current
-     * Dataflow step.
-     */
-    void addInput(String name, Boolean value);
-
-    /**
-     * Adds an input with the given name and value to the current
-     * Dataflow step.
-     */
-    void addInput(String name, String value);
-
-    /**
-     * Adds an input with the given name and value to the current
-     * Dataflow step.
-     */
-    void addInput(String name, Long value);
-
-    /**
-     * Adds an input with the given name to the previously added Dataflow
-     * step, coming from the specified input PValue.
-     */
-    void addInput(String name, PInput value);
-
-    /**
-     * Adds an input that is a dictionary of strings to objects.
-     */
-    void addInput(String name, Map<String, Object> elements);
-
-    /**
-     * Adds an input that is a list of objects.
-     */
-    void addInput(String name, List<? extends Map<String, Object>> elements);
-
-    /**
-     * Adds an output to the previously added Dataflow step,
-     * producing the specified output {@code PValue},
-     * including its {@code Coder} if a {@code TypedPValue}.  If the
-     * {@code PValue} is a {@code PCollection}, wraps its coder inside
-     * a {@code WindowedValueCoder}.  Returns a pipeline level unique id.
-     */
-    long addOutput(PValue value);
-
-    /**
-     * Adds an output to the previously added Dataflow step,
-     * producing the specified output {@code PValue},
-     * including its {@code Coder} if a {@code TypedPValue}.  If the
-     * {@code PValue} is a {@code PCollection}, wraps its coder inside
-     * a {@code ValueOnlyCoder}.  Returns a pipeline level unique id.
-     */
-    long addValueOnlyOutput(PValue value);
-
-    /**
-     * Adds an output to the previously added CollectionToSingleton Dataflow step,
-     * consuming the specified input {@code PValue} and producing the specified output
-     * {@code PValue}.  This step requires special treatment for its
-     * output encoding.  Returns a pipeline level unique id.
-     */
-    long addCollectionToSingletonOutput(PValue inputValue,
-        PValue outputValue);
-  }
-
-
   /////////////////////////////////////////////////////////////////////////////
 
   /**
@@ -838,11 +718,11 @@ public class DataflowPipelineTranslator {
 
     DataflowPipelineTranslator.registerTransformTranslator(
         Combine.GroupedValues.class,
-        new DataflowPipelineTranslator.TransformTranslator<Combine.GroupedValues>() {
+        new TransformTranslator<GroupedValues>() {
           @Override
           public void translate(
               Combine.GroupedValues transform,
-              DataflowPipelineTranslator.TranslationContext context) {
+              TranslationContext context) {
             translateHelper(transform, context);
           }
 
@@ -1007,7 +887,7 @@ public class DataflowPipelineTranslator {
 
     registerTransformTranslator(
         Window.Bound.class,
-        new DataflowPipelineTranslator.TransformTranslator<Window.Bound>() {
+        new TransformTranslator<Bound>() {
           @Override
           public void translate(
               Window.Bound transform, TranslationContext context) {

http://git-wip-us.apache.org/repos/asf/beam/blob/5d2cb3e2/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index d2c1e66..9da7d24 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -33,7 +33,6 @@ import com.google.api.services.clouddebugger.v2.model.RegisterDebuggeeResponse;
 import com.google.api.services.dataflow.model.DataflowPackage;
 import com.google.api.services.dataflow.model.Job;
 import com.google.api.services.dataflow.model.ListJobsResponse;
-import com.google.api.services.dataflow.model.Step;
 import com.google.api.services.dataflow.model.WorkerPool;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
@@ -73,9 +72,6 @@ import java.util.SortedSet;
 import java.util.TreeSet;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.JobSpecification;
-import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.StepTranslationContext;
-import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TransformTranslator;
-import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TranslationContext;
 import org.apache.beam.runners.dataflow.internal.AssignWindows;
 import org.apache.beam.runners.dataflow.internal.DataflowAggregatorTransforms;
 import org.apache.beam.runners.dataflow.internal.DataflowUnboundedReadFromBoundedSource;
@@ -2315,10 +2311,10 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     }
 
     private static class ReadWithIdsTranslator
-        implements DataflowPipelineTranslator.TransformTranslator<ReadWithIds<?>> {
+        implements TransformTranslator<ReadWithIds<?>> {
       @Override
       public void translate(ReadWithIds<?> transform,
-          DataflowPipelineTranslator.TranslationContext context) {
+          TranslationContext context) {
         ReadTranslator.translateReadHelper(transform.getSource(), transform, context);
       }
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/5d2cb3e2/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java
new file mode 100644
index 0000000..2aa8327
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow;
+
+import com.google.api.services.dataflow.model.Step;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+import org.apache.beam.runners.dataflow.util.OutputReference;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PValue;
+
+/**
+ * A {@link TransformTranslator} knows how to translate a particular subclass of {@link PTransform}
+ * for the Cloud Dataflow service. It does so by mutating the {@link TranslationContext}.
+ */
+public interface TransformTranslator<TransformT extends PTransform> {
+  void translate(TransformT transform, TranslationContext context);
+
+  /**
+   * The interface provided to registered callbacks for interacting with the {@link DataflowRunner},
+   * including reading and writing the values of {@link PCollection}s and side inputs.
+   */
+  interface TranslationContext {
+    /** Returns the configured pipeline options. */
+    DataflowPipelineOptions getPipelineOptions();
+
+    /** Returns the input of the currently being translated transform. */
+    <InputT extends PInput> InputT getInput(PTransform<InputT, ?> transform);
+
+    /** Returns the output of the currently being translated transform. */
+    <OutputT extends POutput> OutputT getOutput(PTransform<?, OutputT> transform);
+
+    /** Returns the full name of the currently being translated transform. */
+    String getFullName(PTransform<?, ?> transform);
+
+    /**
+     * Adds a step to the Dataflow workflow for the given transform, with the given Dataflow step
+     * type.
+     */
+    StepTranslationContext addStep(PTransform<?, ?> transform, String type);
+
+    /**
+     * Adds a pre-defined step to the Dataflow workflow. The given PTransform should be consistent
+     * with the Step, in terms of input, output and coder types.
+     *
+     * <p>This is a low-level operation, when using this method it is up to the caller to ensure
+     * that names do not collide.
+     */
+    Step addStep(PTransform<?, ? extends PValue> transform, Step step);
+    /** Encode a PValue reference as an output reference. */
+    OutputReference asOutputReference(PValue value);
+  }
+
+  /** The interface for a {@link TransformTranslator} to build a Dataflow step. */
+  interface StepTranslationContext {
+    /** Sets the encoding for this Dataflow step. */
+    void addEncodingInput(Coder<?> value);
+
+    /** Adds an input with the given name and value to this Dataflow step. */
+    void addInput(String name, Boolean value);
+
+    /** Adds an input with the given name and value to this Dataflow step. */
+    void addInput(String name, String value);
+
+    /** Adds an input with the given name and value to this Dataflow step. */
+    void addInput(String name, Long value);
+
+    /**
+     * Adds an input with the given name to this Dataflow step, coming from the specified input
+     * PValue.
+     */
+    void addInput(String name, PInput value);
+
+    /** Adds an input that is a dictionary of strings to objects. */
+    void addInput(String name, Map<String, Object> elements);
+
+    /** Adds an input that is a list of objects. */
+    void addInput(String name, List<? extends Map<String, Object>> elements);
+
+    /**
+     * Adds an output to this Dataflow step, producing the specified output {@code PValue},
+     * including its {@code Coder} if a {@code TypedPValue}. If the {@code PValue} is a {@code
+     * PCollection}, wraps its coder inside a {@code WindowedValueCoder}. Returns a pipeline level
+     * unique id.
+     */
+    long addOutput(PValue value);
+
+    /**
+     * Adds an output to this Dataflow step, producing the specified output {@code PValue},
+     * including its {@code Coder} if a {@code TypedPValue}. If the {@code PValue} is a {@code
+     * PCollection}, wraps its coder inside a {@code ValueOnlyCoder}. Returns a pipeline level
+     * unique id.
+     */
+    long addValueOnlyOutput(PValue value);
+
+    /**
+     * Adds an output to this {@code CollectionToSingleton} Dataflow step, consuming the specified
+     * input {@code PValue} and producing the specified output {@code PValue}. This step requires
+     * special treatment for its output encoding. Returns a pipeline level unique id.
+     */
+    long addCollectionToSingletonOutput(PValue inputValue, PValue outputValue);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/5d2cb3e2/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java
index 1a5a9a5..a15a2a3 100755
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java
@@ -24,10 +24,7 @@ import static org.apache.beam.sdk.util.Structs.addLong;
 import com.google.api.services.dataflow.model.SourceMetadata;
 import java.util.HashMap;
 import java.util.Map;
-import org.apache.beam.runners.dataflow.DataflowPipelineTranslator;
-import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.StepTranslationContext;
-import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TransformTranslator;
-import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TranslationContext;
+import org.apache.beam.runners.dataflow.TransformTranslator;
 import org.apache.beam.sdk.io.FileBasedSource;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.Source;
@@ -47,7 +44,7 @@ public class ReadTranslator implements TransformTranslator<Read.Bounded<?>> {
 
   public static <T> void translateReadHelper(Source<T> source,
       PTransform<?, ? extends PValue> transform,
-      DataflowPipelineTranslator.TranslationContext context) {
+      TranslationContext context) {
     try {
       // TODO: Move this validation out of translation once IOChannelUtils is portable
       // and can be reconstructed on the worker.

http://git-wip-us.apache.org/repos/asf/beam/blob/5d2cb3e2/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index ab82941..84b585a 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -56,7 +56,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.JobSpecification;
-import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TranslationContext;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
 import org.apache.beam.runners.dataflow.util.OutputReference;
@@ -566,7 +565,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
    * {@link TranslationContext#addStep} and remaps the input port reference.
    */
   private static class EmbeddedTranslator
-      implements DataflowPipelineTranslator.TransformTranslator<EmbeddedTransform> {
+      implements TransformTranslator<EmbeddedTransform> {
     @Override public void translate(EmbeddedTransform transform, TranslationContext context) {
       addObject(transform.step.getProperties(), PropertyNames.PARALLEL_INPUT,
           context.asOutputReference(context.getInput(transform)));

http://git-wip-us.apache.org/repos/asf/beam/blob/5d2cb3e2/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index a19fd8c..4fff1c6 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -61,7 +61,6 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.regex.Pattern;
-import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.StepTranslationContext;
 import org.apache.beam.runners.dataflow.DataflowRunner.BatchViewAsList;
 import org.apache.beam.runners.dataflow.DataflowRunner.BatchViewAsMap;
 import org.apache.beam.runners.dataflow.DataflowRunner.BatchViewAsMultimap;
@@ -989,12 +988,12 @@ public class DataflowRunnerTest {
 
     DataflowPipelineTranslator.registerTransformTranslator(
         TestTransform.class,
-        new DataflowPipelineTranslator.TransformTranslator<TestTransform>() {
+        new TransformTranslator<TestTransform>() {
           @SuppressWarnings("unchecked")
           @Override
           public void translate(
               TestTransform transform,
-              DataflowPipelineTranslator.TranslationContext context) {
+              TranslationContext context) {
             transform.translated = true;
 
             // Note: This is about the minimum needed to fake out a