You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/01/06 20:07:57 UTC
[2/5] beam git commit: Move some pieces of Dataflow translator to top
level
Move some pieces of Dataflow translator to top level
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5d2cb3e2
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5d2cb3e2
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5d2cb3e2
Branch: refs/heads/master
Commit: 5d2cb3e2310dbf7046785e9e8f6403b854b2dd03
Parents: f04537c
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Jan 5 16:51:23 2017 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Jan 6 11:36:51 2017 -0800
----------------------------------------------------------------------
.../dataflow/DataflowPipelineTranslator.java | 134 +------------------
.../beam/runners/dataflow/DataflowRunner.java | 8 +-
.../runners/dataflow/TransformTranslator.java | 123 +++++++++++++++++
.../dataflow/internal/ReadTranslator.java | 7 +-
.../DataflowPipelineTranslatorTest.java | 3 +-
.../runners/dataflow/DataflowRunnerTest.java | 5 +-
6 files changed, 137 insertions(+), 143 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/5d2cb3e2/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index 2385fa1..e9cf6f4 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -56,6 +56,8 @@ import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import org.apache.beam.runners.dataflow.DataflowRunner.GroupByKeyAndSortValuesOnly;
+import org.apache.beam.runners.dataflow.TransformTranslator.StepTranslationContext;
+import org.apache.beam.runners.dataflow.TransformTranslator.TranslationContext;
import org.apache.beam.runners.dataflow.internal.ReadTranslator;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.util.DoFnInfo;
@@ -69,6 +71,7 @@ import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Combine.GroupedValues;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
@@ -80,6 +83,7 @@ import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.transforms.windowing.Window.Bound;
import org.apache.beam.sdk.util.AppliedCombineFn;
import org.apache.beam.sdk.util.CloudObject;
import org.apache.beam.sdk.util.PropertyNames;
@@ -212,130 +216,6 @@ public class DataflowPipelineTranslator {
return transformTranslators.get(transformClass);
}
- /**
- * A {@link TransformTranslator} knows how to translate a particular subclass of {@link
- * PTransform} for the Cloud Dataflow service. It does so by mutating the {@link
- * TranslationContext}.
- */
- public interface TransformTranslator<TransformT extends PTransform> {
- void translate(TransformT transform, TranslationContext context);
- }
-
- /**
- * The interface provided to registered callbacks for interacting
- * with the {@link DataflowRunner}, including reading and writing the
- * values of {@link PCollection}s and side inputs ({@link PCollectionView}s).
- */
- public interface TranslationContext {
- /**
- * Returns the configured pipeline options.
- */
- DataflowPipelineOptions getPipelineOptions();
-
- /**
- * Returns the input of the currently being translated transform.
- */
- <InputT extends PInput> InputT getInput(PTransform<InputT, ?> transform);
-
- /**
- * Returns the output of the currently being translated transform.
- */
- <OutputT extends POutput> OutputT getOutput(PTransform<?, OutputT> transform);
-
- /**
- * Returns the full name of the currently being translated transform.
- */
- String getFullName(PTransform<?, ?> transform);
-
- /**
- * Adds a step to the Dataflow workflow for the given transform, with
- * the given Dataflow step type.
- */
- StepTranslationContext addStep(PTransform<?, ?> transform, String type);
-
- /**
- * Adds a pre-defined step to the Dataflow workflow. The given PTransform should be
- * consistent with the Step, in terms of input, output and coder types.
- *
- * <p>This is a low-level operation, when using this method it is up to
- * the caller to ensure that names do not collide.
- */
- Step addStep(PTransform<?, ? extends PValue> transform, Step step);
- /**
- * Encode a PValue reference as an output reference.
- */
- OutputReference asOutputReference(PValue value);
- }
-
- public interface StepTranslationContext {
- /**
- * Sets the encoding for the current Dataflow step.
- */
- void addEncodingInput(Coder<?> value);
-
- /**
- * Adds an input with the given name and value to the current
- * Dataflow step.
- */
- void addInput(String name, Boolean value);
-
- /**
- * Adds an input with the given name and value to the current
- * Dataflow step.
- */
- void addInput(String name, String value);
-
- /**
- * Adds an input with the given name and value to the current
- * Dataflow step.
- */
- void addInput(String name, Long value);
-
- /**
- * Adds an input with the given name to the previously added Dataflow
- * step, coming from the specified input PValue.
- */
- void addInput(String name, PInput value);
-
- /**
- * Adds an input that is a dictionary of strings to objects.
- */
- void addInput(String name, Map<String, Object> elements);
-
- /**
- * Adds an input that is a list of objects.
- */
- void addInput(String name, List<? extends Map<String, Object>> elements);
-
- /**
- * Adds an output to the previously added Dataflow step,
- * producing the specified output {@code PValue},
- * including its {@code Coder} if a {@code TypedPValue}. If the
- * {@code PValue} is a {@code PCollection}, wraps its coder inside
- * a {@code WindowedValueCoder}. Returns a pipeline level unique id.
- */
- long addOutput(PValue value);
-
- /**
- * Adds an output to the previously added Dataflow step,
- * producing the specified output {@code PValue},
- * including its {@code Coder} if a {@code TypedPValue}. If the
- * {@code PValue} is a {@code PCollection}, wraps its coder inside
- * a {@code ValueOnlyCoder}. Returns a pipeline level unique id.
- */
- long addValueOnlyOutput(PValue value);
-
- /**
- * Adds an output to the previously added CollectionToSingleton Dataflow step,
- * consuming the specified input {@code PValue} and producing the specified output
- * {@code PValue}. This step requires special treatment for its
- * output encoding. Returns a pipeline level unique id.
- */
- long addCollectionToSingletonOutput(PValue inputValue,
- PValue outputValue);
- }
-
-
/////////////////////////////////////////////////////////////////////////////
/**
@@ -838,11 +718,11 @@ public class DataflowPipelineTranslator {
DataflowPipelineTranslator.registerTransformTranslator(
Combine.GroupedValues.class,
- new DataflowPipelineTranslator.TransformTranslator<Combine.GroupedValues>() {
+ new TransformTranslator<GroupedValues>() {
@Override
public void translate(
Combine.GroupedValues transform,
- DataflowPipelineTranslator.TranslationContext context) {
+ TranslationContext context) {
translateHelper(transform, context);
}
@@ -1007,7 +887,7 @@ public class DataflowPipelineTranslator {
registerTransformTranslator(
Window.Bound.class,
- new DataflowPipelineTranslator.TransformTranslator<Window.Bound>() {
+ new TransformTranslator<Bound>() {
@Override
public void translate(
Window.Bound transform, TranslationContext context) {
http://git-wip-us.apache.org/repos/asf/beam/blob/5d2cb3e2/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index d2c1e66..9da7d24 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -33,7 +33,6 @@ import com.google.api.services.clouddebugger.v2.model.RegisterDebuggeeResponse;
import com.google.api.services.dataflow.model.DataflowPackage;
import com.google.api.services.dataflow.model.Job;
import com.google.api.services.dataflow.model.ListJobsResponse;
-import com.google.api.services.dataflow.model.Step;
import com.google.api.services.dataflow.model.WorkerPool;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
@@ -73,9 +72,6 @@ import java.util.SortedSet;
import java.util.TreeSet;
import javax.annotation.Nullable;
import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.JobSpecification;
-import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.StepTranslationContext;
-import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TransformTranslator;
-import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TranslationContext;
import org.apache.beam.runners.dataflow.internal.AssignWindows;
import org.apache.beam.runners.dataflow.internal.DataflowAggregatorTransforms;
import org.apache.beam.runners.dataflow.internal.DataflowUnboundedReadFromBoundedSource;
@@ -2315,10 +2311,10 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
}
private static class ReadWithIdsTranslator
- implements DataflowPipelineTranslator.TransformTranslator<ReadWithIds<?>> {
+ implements TransformTranslator<ReadWithIds<?>> {
@Override
public void translate(ReadWithIds<?> transform,
- DataflowPipelineTranslator.TranslationContext context) {
+ TranslationContext context) {
ReadTranslator.translateReadHelper(transform.getSource(), transform, context);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/5d2cb3e2/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java
new file mode 100644
index 0000000..2aa8327
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow;
+
+import com.google.api.services.dataflow.model.Step;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+import org.apache.beam.runners.dataflow.util.OutputReference;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PValue;
+
+/**
+ * A {@link TransformTranslator} knows how to translate a particular subclass of {@link PTransform}
+ * for the Cloud Dataflow service. It does so by mutating the {@link TranslationContext}.
+ */
+public interface TransformTranslator<TransformT extends PTransform> {
+ void translate(TransformT transform, TranslationContext context);
+
+ /**
+ * The interface provided to registered callbacks for interacting with the {@link DataflowRunner},
+ * including reading and writing the values of {@link PCollection}s and side inputs.
+ */
+ interface TranslationContext {
+ /** Returns the configured pipeline options. */
+ DataflowPipelineOptions getPipelineOptions();
+
+ /** Returns the input of the currently being translated transform. */
+ <InputT extends PInput> InputT getInput(PTransform<InputT, ?> transform);
+
+ /** Returns the output of the currently being translated transform. */
+ <OutputT extends POutput> OutputT getOutput(PTransform<?, OutputT> transform);
+
+ /** Returns the full name of the currently being translated transform. */
+ String getFullName(PTransform<?, ?> transform);
+
+ /**
+ * Adds a step to the Dataflow workflow for the given transform, with the given Dataflow step
+ * type.
+ */
+ StepTranslationContext addStep(PTransform<?, ?> transform, String type);
+
+ /**
+ * Adds a pre-defined step to the Dataflow workflow. The given PTransform should be consistent
+ * with the Step, in terms of input, output and coder types.
+ *
+ * <p>This is a low-level operation, when using this method it is up to the caller to ensure
+ * that names do not collide.
+ */
+ Step addStep(PTransform<?, ? extends PValue> transform, Step step);
+ /** Encode a PValue reference as an output reference. */
+ OutputReference asOutputReference(PValue value);
+ }
+
+ /** The interface for a {@link TransformTranslator} to build a Dataflow step. */
+ interface StepTranslationContext {
+ /** Sets the encoding for this Dataflow step. */
+ void addEncodingInput(Coder<?> value);
+
+ /** Adds an input with the given name and value to this Dataflow step. */
+ void addInput(String name, Boolean value);
+
+ /** Adds an input with the given name and value to this Dataflow step. */
+ void addInput(String name, String value);
+
+ /** Adds an input with the given name and value to this Dataflow step. */
+ void addInput(String name, Long value);
+
+ /**
+ * Adds an input with the given name to this Dataflow step, coming from the specified input
+ * PValue.
+ */
+ void addInput(String name, PInput value);
+
+ /** Adds an input that is a dictionary of strings to objects. */
+ void addInput(String name, Map<String, Object> elements);
+
+ /** Adds an input that is a list of objects. */
+ void addInput(String name, List<? extends Map<String, Object>> elements);
+
+ /**
+ * Adds an output to this Dataflow step, producing the specified output {@code PValue},
+ * including its {@code Coder} if a {@code TypedPValue}. If the {@code PValue} is a {@code
+ * PCollection}, wraps its coder inside a {@code WindowedValueCoder}. Returns a pipeline level
+ * unique id.
+ */
+ long addOutput(PValue value);
+
+ /**
+ * Adds an output to this Dataflow step, producing the specified output {@code PValue},
+ * including its {@code Coder} if a {@code TypedPValue}. If the {@code PValue} is a {@code
+ * PCollection}, wraps its coder inside a {@code ValueOnlyCoder}. Returns a pipeline level
+ * unique id.
+ */
+ long addValueOnlyOutput(PValue value);
+
+ /**
+ * Adds an output to this {@code CollectionToSingleton} Dataflow step, consuming the specified
+ * input {@code PValue} and producing the specified output {@code PValue}. This step requires
+ * special treatment for its output encoding. Returns a pipeline level unique id.
+ */
+ long addCollectionToSingletonOutput(PValue inputValue, PValue outputValue);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/5d2cb3e2/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java
index 1a5a9a5..a15a2a3 100755
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java
@@ -24,10 +24,7 @@ import static org.apache.beam.sdk.util.Structs.addLong;
import com.google.api.services.dataflow.model.SourceMetadata;
import java.util.HashMap;
import java.util.Map;
-import org.apache.beam.runners.dataflow.DataflowPipelineTranslator;
-import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.StepTranslationContext;
-import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TransformTranslator;
-import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TranslationContext;
+import org.apache.beam.runners.dataflow.TransformTranslator;
import org.apache.beam.sdk.io.FileBasedSource;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.Source;
@@ -47,7 +44,7 @@ public class ReadTranslator implements TransformTranslator<Read.Bounded<?>> {
public static <T> void translateReadHelper(Source<T> source,
PTransform<?, ? extends PValue> transform,
- DataflowPipelineTranslator.TranslationContext context) {
+ TranslationContext context) {
try {
// TODO: Move this validation out of translation once IOChannelUtils is portable
// and can be reconstructed on the worker.
http://git-wip-us.apache.org/repos/asf/beam/blob/5d2cb3e2/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index ab82941..84b585a 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -56,7 +56,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.JobSpecification;
-import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TranslationContext;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
import org.apache.beam.runners.dataflow.util.OutputReference;
@@ -566,7 +565,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
* {@link TranslationContext#addStep} and remaps the input port reference.
*/
private static class EmbeddedTranslator
- implements DataflowPipelineTranslator.TransformTranslator<EmbeddedTransform> {
+ implements TransformTranslator<EmbeddedTransform> {
@Override public void translate(EmbeddedTransform transform, TranslationContext context) {
addObject(transform.step.getProperties(), PropertyNames.PARALLEL_INPUT,
context.asOutputReference(context.getInput(transform)));
http://git-wip-us.apache.org/repos/asf/beam/blob/5d2cb3e2/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index a19fd8c..4fff1c6 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -61,7 +61,6 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
-import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.StepTranslationContext;
import org.apache.beam.runners.dataflow.DataflowRunner.BatchViewAsList;
import org.apache.beam.runners.dataflow.DataflowRunner.BatchViewAsMap;
import org.apache.beam.runners.dataflow.DataflowRunner.BatchViewAsMultimap;
@@ -989,12 +988,12 @@ public class DataflowRunnerTest {
DataflowPipelineTranslator.registerTransformTranslator(
TestTransform.class,
- new DataflowPipelineTranslator.TransformTranslator<TestTransform>() {
+ new TransformTranslator<TestTransform>() {
@SuppressWarnings("unchecked")
@Override
public void translate(
TestTransform transform,
- DataflowPipelineTranslator.TranslationContext context) {
+ TranslationContext context) {
transform.translated = true;
// Note: This is about the minimum needed to fake out a