You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/12/18 21:16:35 UTC
[beam] branch master updated: [BEAM-2929] Remove Dataflow
expansions for PCollectionView that have been migrated into the Dataflow
service for the portability framework.
This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 9d9ffa5 [BEAM-2929] Remove Dataflow expansions for PCollectionView that have been migrated into the Dataflow service for the portability framework.
9d9ffa5 is described below
commit 9d9ffa5f1a3a9f280dfafae15944764a568515ef
Author: Luke Cwik <lc...@google.com>
AuthorDate: Tue Dec 12 16:27:58 2017 -0800
[BEAM-2929] Remove Dataflow expansions for PCollectionView that have been migrated into the Dataflow service for the portability framework.
---
.../dataflow/DataflowPipelineTranslator.java | 37 +++++++++++++++++++---
.../beam/runners/dataflow/DataflowRunner.java | 17 ++++++----
2 files changed, 44 insertions(+), 10 deletions(-)
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index 4f9b939..5c26e0d 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Strings.isNullOrEmpty;
+import static org.apache.beam.runners.dataflow.DataflowRunner.hasExperiment;
import static org.apache.beam.runners.dataflow.util.Structs.addBoolean;
import static org.apache.beam.runners.dataflow.util.Structs.addDictionary;
import static org.apache.beam.runners.dataflow.util.Structs.addList;
@@ -84,6 +85,7 @@ import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
@@ -349,7 +351,7 @@ public class DataflowPipelineTranslator {
job.setLabels(options.getLabels());
}
if (options.isStreaming()
- && !DataflowRunner.hasExperiment(options, "enable_windmill_service")) {
+ && !hasExperiment(options, "enable_windmill_service")) {
// Use separate data disk for streaming.
Disk disk = new Disk();
disk.setDiskType(options.getWorkerDiskType());
@@ -447,13 +449,22 @@ public class DataflowPipelineTranslator {
public void visitValue(PValue value, TransformHierarchy.Node producer) {
LOG.debug("Checking translation of {}", value);
// Primitive transforms are the only ones assigned step names.
- if (producer.getTransform() instanceof CreateDataflowView) {
- // CreateDataflowView produces a dummy output (as it must be a primitive transform) but
- // in the Dataflow Job graph produces only the view and not the output PCollection.
+ if (producer.getTransform() instanceof CreateDataflowView
+ && !hasExperiment(options, "beam_fn_api")) {
+ // CreateDataflowView produces a dummy output (as it must be a primitive transform)
+ // but in the Dataflow Job graph produces only the view and not the output PCollection.
asOutputReference(
((CreateDataflowView) producer.getTransform()).getView(),
producer.toAppliedPTransform(getPipeline()));
return;
+ } else if (producer.getTransform() instanceof View.CreatePCollectionView
+ && hasExperiment(options, "beam_fn_api")) {
+ // View.CreatePCollectionView produces a dummy output (as it must be a primitive transform)
+ // but in the Dataflow Job graph produces only the view and not the output PCollection.
+ asOutputReference(
+ ((View.CreatePCollectionView) producer.getTransform()).getView(),
+ producer.toAppliedPTransform(getPipeline()));
+ return;
}
asOutputReference(value, producer.toAppliedPTransform(getPipeline()));
}
@@ -680,6 +691,24 @@ public class DataflowPipelineTranslator {
static {
registerTransformTranslator(
+ View.CreatePCollectionView.class,
+ new TransformTranslator<View.CreatePCollectionView>() {
+ @Override
+ public void translate(View.CreatePCollectionView transform, TranslationContext context) {
+ translateTyped(transform, context);
+ }
+
+ private <ElemT, ViewT> void translateTyped(
+ View.CreatePCollectionView<ElemT, ViewT> transform, TranslationContext context) {
+ StepTranslationContext stepContext =
+ context.addStep(transform, "CollectionToSingleton");
+ PCollection<ElemT> input = context.getInput(transform);
+ stepContext.addInput(PropertyNames.PARALLEL_INPUT, input);
+ stepContext.addCollectionToSingletonOutput(input, transform.getView());
+ }
+ });
+
+ registerTransformTranslator(
CreateDataflowView.class,
new TransformTranslator<CreateDataflowView>() {
@Override
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index ddad43f..942d36b 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -382,11 +382,13 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
.add(
PTransformOverride.of(
PTransformMatchers.classEqualTo(Read.Unbounded.class),
- new StreamingUnboundedReadOverrideFactory()))
- .add(
- PTransformOverride.of(
- PTransformMatchers.classEqualTo(View.CreatePCollectionView.class),
- new StreamingCreatePCollectionViewFactory()));
+ new StreamingUnboundedReadOverrideFactory()));
+ if (!hasExperiment(options, "beam_fn_api")) {
+ overridesBuilder.add(
+ PTransformOverride.of(
+ PTransformMatchers.classEqualTo(View.CreatePCollectionView.class),
+ new StreamingCreatePCollectionViewFactory()));
+ }
} else {
overridesBuilder
// State and timer pardos are implemented by expansion to GBK-then-ParDo
@@ -397,7 +399,9 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
.add(
PTransformOverride.of(
PTransformMatchers.stateOrTimerParDoSingle(),
- BatchStatefulParDoOverrides.singleOutputOverrideFactory(options)))
+ BatchStatefulParDoOverrides.singleOutputOverrideFactory(options)));
+ if (!hasExperiment(options, "beam_fn_api")) {
+ overridesBuilder
.add(
PTransformOverride.of(
PTransformMatchers.classEqualTo(View.AsMap.class),
@@ -422,6 +426,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
PTransformMatchers.classEqualTo(View.AsIterable.class),
new ReflectiveViewOverrideFactory(
BatchViewOverrides.BatchViewAsIterable.class, this)));
+ }
}
// Expands into Reshuffle and single-output ParDo, so has to be before the overrides below.
if (hasExperiment(options, "beam_fn_api")) {
--
To stop receiving notification emails like this one, please contact
['"commits@beam.apache.org" <co...@beam.apache.org>'].