You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/06/05 17:19:43 UTC
[1/2] beam git commit: Remove the requirement to visit
PCollectionViews in Dataflow
Repository: beam
Updated Branches:
refs/heads/master 09d75a0b0 -> 6543e56d2
Remove the requirement to visit PCollectionViews in Dataflow
Remove the unused addStep method in the Dataflow Translator.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7e82bc4b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7e82bc4b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7e82bc4b
Branch: refs/heads/master
Commit: 7e82bc4b280cd35fca042a50f0055cd68850da68
Parents: 09d75a0
Author: Thomas Groh <tg...@google.com>
Authored: Wed May 31 15:42:03 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Mon Jun 5 10:19:21 2017 -0700
----------------------------------------------------------------------
.../dataflow/DataflowPipelineTranslator.java | 72 +++------------
.../beam/runners/dataflow/ReadTranslator.java | 6 +-
.../runners/dataflow/TransformTranslator.java | 26 +++---
.../DataflowPipelineTranslatorTest.java | 95 --------------------
4 files changed, 24 insertions(+), 175 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/7e82bc4b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index af93ef5..8eaf61b 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -438,12 +438,9 @@ public class DataflowPipelineTranslator {
@Override
public void visitValue(PValue value, TransformHierarchy.Node producer) {
- producers.put(value, producer.toAppliedPTransform(getPipeline()));
LOG.debug("Checking translation of {}", value);
- if (!producer.isCompositeNode()) {
- // Primitive transforms are the only ones assigned step names.
- asOutputReference(value, producer.toAppliedPTransform(getPipeline()));
- }
+ // Primitive transforms are the only ones assigned step names.
+ asOutputReference(value, producer.toAppliedPTransform(getPipeline()));
}
@Override
@@ -471,48 +468,6 @@ public class DataflowPipelineTranslator {
return stepContext;
}
- @Override
- public Step addStep(PTransform<?, ? extends PValue> transform, Step original) {
- Step step = original.clone();
- String stepName = step.getName();
- if (stepNames.put(getCurrentTransform(transform), stepName) != null) {
- throw new IllegalArgumentException(transform + " already has a name specified");
- }
-
- Map<String, Object> properties = step.getProperties();
- if (properties != null) {
- @Nullable List<Map<String, Object>> outputInfoList = null;
- try {
- // TODO: This should be done via a Structs accessor.
- @Nullable List<Map<String, Object>> list =
- (List<Map<String, Object>>) properties.get(PropertyNames.OUTPUT_INFO);
- outputInfoList = list;
- } catch (Exception e) {
- throw new RuntimeException("Inconsistent dataflow pipeline translation", e);
- }
- if (outputInfoList != null && outputInfoList.size() > 0) {
- Map<String, Object> firstOutputPort = outputInfoList.get(0);
- @Nullable String name;
- try {
- name = getString(firstOutputPort, PropertyNames.OUTPUT_NAME);
- } catch (Exception e) {
- name = null;
- }
- if (name != null) {
- registerOutputName(getOutput(transform), name);
- }
- }
- }
-
- List<Step> steps = job.getSteps();
- if (steps == null) {
- steps = new LinkedList<>();
- job.setSteps(steps);
- }
- steps.add(step);
- return step;
- }
-
public OutputReference asOutputReference(PValue value, AppliedPTransform<?, ?, ?> producer) {
String stepName = stepNames.get(producer);
checkArgument(stepName != null, "%s doesn't have a name specified", producer);
@@ -607,26 +562,19 @@ public class DataflowPipelineTranslator {
}
@Override
- public long addOutput(PValue value) {
- Coder<?> coder;
- if (value instanceof PCollection) {
- coder = ((PCollection<?>) value).getCoder();
- if (value instanceof PCollection) {
- // Wrap the PCollection element Coder inside a WindowedValueCoder.
- coder = WindowedValue.getFullCoder(
- coder,
- ((PCollection<?>) value).getWindowingStrategy().getWindowFn().windowCoder());
- }
- } else {
- // No output coder to encode.
- coder = null;
- }
+ public long addOutput(PCollection<?> value) {
+ translator.producers.put(value, translator.currentTransform);
+ // Wrap the PCollection element Coder inside a WindowedValueCoder.
+ Coder<?> coder =
+ WindowedValue.getFullCoder(
+ value.getCoder(), value.getWindowingStrategy().getWindowFn().windowCoder());
return addOutput(value, coder);
}
@Override
public long addCollectionToSingletonOutput(
- PValue inputValue, PValue outputValue) {
+ PCollection<?> inputValue, PCollectionView<?> outputValue) {
+ translator.producers.put(outputValue, translator.currentTransform);
Coder<?> inputValueCoder =
checkNotNull(translator.outputCoders.get(inputValue));
// The inputValueCoder for the input PCollection should be some
http://git-wip-us.apache.org/repos/asf/beam/blob/7e82bc4b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java
index 0b22d7e..693748a 100755
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java
@@ -29,7 +29,7 @@ import org.apache.beam.runners.dataflow.util.PropertyNames;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.Source;
import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.PCollection;
/**
* Translator for the {@code Read} {@code PTransform} for the Dataflow back-end.
@@ -41,7 +41,9 @@ class ReadTranslator implements TransformTranslator<Read.Bounded<?>> {
}
public static <T> void translateReadHelper(
- Source<T> source, PTransform<?, ? extends PValue> transform, TranslationContext context) {
+ Source<T> source,
+ PTransform<?, ? extends PCollection<?>> transform,
+ TranslationContext context) {
try {
StepTranslationContext stepContext = context.addStep(transform, "ParallelRead");
stepContext.addInput(PropertyNames.FORMAT, PropertyNames.CUSTOM_SOURCE_FORMAT);
http://git-wip-us.apache.org/repos/asf/beam/blob/7e82bc4b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java
index 23949bd..a7452b2 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java
@@ -17,15 +17,16 @@
*/
package org.apache.beam.runners.dataflow;
-import com.google.api.services.dataflow.model.Step;
import java.util.List;
import java.util.Map;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.util.OutputReference;
+import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.PValue;
@@ -65,14 +66,6 @@ interface TransformTranslator<TransformT extends PTransform> {
*/
StepTranslationContext addStep(PTransform<?, ?> transform, String type);
- /**
- * Adds a pre-defined step to the Dataflow workflow. The given PTransform should be consistent
- * with the Step, in terms of input, output and coder types.
- *
- * <p>This is a low-level operation, when using this method it is up to the caller to ensure
- * that names do not collide.
- */
- Step addStep(PTransform<?, ? extends PValue> transform, Step step);
/** Encode a PValue reference as an output reference. */
OutputReference asOutputReference(PValue value, AppliedPTransform<?, ?, ?> producer);
@@ -100,10 +93,11 @@ interface TransformTranslator<TransformT extends PTransform> {
* Adds an input with the given name to this Dataflow step, coming from the specified input
* PValue.
*
- * <p>The input {@link PValue} must have already been produced by a step earlier in this {@link
- * Pipeline}. If the input value has not yet been produced yet (either by a call to {@link
- * StepTranslationContext#addOutput(PValue)} or within a call to {@link
- * TranslationContext#addStep(PTransform, Step)}), this method will throw an exception.
+ * <p>The input {@link PValue} must have already been produced by a step earlier in this
+ * {@link Pipeline}. If the input value has not yet been produced yet (by a call to either
+ * {@link StepTranslationContext#addOutput(PCollection)} or
+ * {@link StepTranslationContext#addCollectionToSingletonOutput(PCollection, PCollectionView)})
+ * this method will throw an exception.
*/
void addInput(String name, PInput value);
@@ -114,18 +108,18 @@ interface TransformTranslator<TransformT extends PTransform> {
void addInput(String name, List<? extends Map<String, Object>> elements);
/**
- * Adds an output to this Dataflow step, producing the specified output {@code PValue},
+ * Adds a primitive output to this Dataflow step, producing the specified output {@code PValue},
* including its {@code Coder} if a {@code TypedPValue}. If the {@code PValue} is a {@code
* PCollection}, wraps its coder inside a {@code WindowedValueCoder}. Returns a pipeline level
* unique id.
*/
- long addOutput(PValue value);
+ long addOutput(PCollection<?> value);
/**
* Adds an output to this {@code CollectionToSingleton} Dataflow step, consuming the specified
* input {@code PValue} and producing the specified output {@code PValue}. This step requires
* special treatment for its output encoding. Returns a pipeline level unique id.
*/
- long addCollectionToSingletonOutput(PValue inputValue, PValue outputValue);
+ long addCollectionToSingletonOutput(PCollection<?> inputValue, PCollectionView<?> outputValue);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7e82bc4b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index 87744f0..89dc2d5 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -17,8 +17,6 @@
*/
package org.apache.beam.runners.dataflow;
-import static org.apache.beam.runners.dataflow.util.Structs.addObject;
-import static org.apache.beam.runners.dataflow.util.Structs.getDictionary;
import static org.apache.beam.runners.dataflow.util.Structs.getString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasEntry;
@@ -73,7 +71,6 @@ import org.apache.beam.runners.dataflow.util.PropertyNames;
import org.apache.beam.runners.dataflow.util.Structs;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
@@ -531,57 +528,6 @@ public class DataflowPipelineTranslatorTest implements Serializable {
job.getEnvironment().getWorkerPools().get(0).getDiskSizeGb());
}
- @Test
- public void testPredefinedAddStep() throws Exception {
- DataflowPipelineOptions options = buildPipelineOptions();
-
- DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);
- DataflowPipelineTranslator.registerTransformTranslator(
- EmbeddedTransform.class, new EmbeddedTranslator());
-
- // Create a predefined step using another pipeline
- Step predefinedStep = createPredefinedStep();
-
- // Create a pipeline that the predefined step will be embedded into
- Pipeline pipeline = Pipeline.create(options);
- pipeline.apply("ReadMyFile", TextIO.read().from("gs://bucket/in"))
- .apply(ParDo.of(new NoOpFn()))
- .apply(new EmbeddedTransform(predefinedStep.clone()))
- .apply(ParDo.of(new NoOpFn()));
- DataflowRunner runner = DataflowRunner.fromOptions(options);
- runner.replaceTransforms(pipeline);
- Job job =
- translator
- .translate(
- pipeline,
- runner,
- Collections.<DataflowPackage>emptyList())
- .getJob();
- assertAllStepOutputsHaveUniqueIds(job);
-
- List<Step> steps = job.getSteps();
- assertEquals(4, steps.size());
-
- // The input to the embedded step should match the output of the step before
- Map<String, Object> step1Out = getOutputPortReference(steps.get(1));
- Map<String, Object> step2In = getDictionary(
- steps.get(2).getProperties(), PropertyNames.PARALLEL_INPUT);
- assertEquals(step1Out, step2In);
-
- // The output from the embedded step should match the input of the step after
- Map<String, Object> step2Out = getOutputPortReference(steps.get(2));
- Map<String, Object> step3In = getDictionary(
- steps.get(3).getProperties(), PropertyNames.PARALLEL_INPUT);
- assertEquals(step2Out, step3In);
-
- // The step should not have been modified other than remapping the input
- Step predefinedStepClone = predefinedStep.clone();
- Step embeddedStepClone = steps.get(2).clone();
- predefinedStepClone.getProperties().remove(PropertyNames.PARALLEL_INPUT);
- embeddedStepClone.getProperties().remove(PropertyNames.PARALLEL_INPUT);
- assertEquals(predefinedStepClone, embeddedStepClone);
- }
-
/**
* Construct a OutputReference for the output of the step.
*/
@@ -630,47 +576,6 @@ public class DataflowPipelineTranslatorTest implements Serializable {
}
/**
- * A placeholder transform that will be used to substitute a predefined Step.
- */
- private static class EmbeddedTransform
- extends PTransform<PCollection<String>, PCollection<String>> {
- private final Step step;
-
- public EmbeddedTransform(Step step) {
- this.step = step;
- }
-
- @Override
- public PCollection<String> expand(PCollection<String> input) {
- return PCollection.createPrimitiveOutputInternal(
- input.getPipeline(),
- WindowingStrategy.globalDefault(),
- input.isBounded());
- }
-
- @Override
- protected Coder<?> getDefaultOutputCoder() {
- return StringUtf8Coder.of();
- }
- }
-
- /**
- * A TransformTranslator that adds the predefined Step using
- * {@link TranslationContext#addStep} and remaps the input port reference.
- */
- private static class EmbeddedTranslator
- implements TransformTranslator<EmbeddedTransform> {
- @Override public void translate(EmbeddedTransform transform, TranslationContext context) {
- PCollection<String> input = context.getInput(transform);
- addObject(
- transform.step.getProperties(),
- PropertyNames.PARALLEL_INPUT,
- context.asOutputReference(input, context.getProducer(input)));
- context.addStep(transform, transform.step);
- }
- }
-
- /**
* A composite transform that returns an output that is unrelated to
* the input.
*/
[2/2] beam git commit: This closes #3272
Posted by tg...@apache.org.
This closes #3272
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6543e56d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6543e56d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6543e56d
Branch: refs/heads/master
Commit: 6543e56d264f56f8c7cd92afd71c5311a859895c
Parents: 09d75a0 7e82bc4
Author: Thomas Groh <tg...@google.com>
Authored: Mon Jun 5 10:19:32 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Mon Jun 5 10:19:32 2017 -0700
----------------------------------------------------------------------
.../dataflow/DataflowPipelineTranslator.java | 72 +++------------
.../beam/runners/dataflow/ReadTranslator.java | 6 +-
.../runners/dataflow/TransformTranslator.java | 26 +++---
.../DataflowPipelineTranslatorTest.java | 95 --------------------
4 files changed, 24 insertions(+), 175 deletions(-)
----------------------------------------------------------------------