You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/04/03 16:18:17 UTC
[2/2] beam git commit: Add a Dataflow-specific primitive for creating
a view
Add a Dataflow-specific primitive for creating a view
Allows overrides of CreatePCollectionView to work with the batch
override API.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1ce88e1d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1ce88e1d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1ce88e1d
Branch: refs/heads/master
Commit: 1ce88e1db5c75bedf55bea38786f55818627e29e
Parents: fe1d412
Author: Thomas Groh <tg...@google.com>
Authored: Thu Mar 30 18:09:26 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Mon Apr 3 09:18:06 2017 -0700
----------------------------------------------------------------------
.../runners/dataflow/BatchViewOverrides.java | 8 +-
.../runners/dataflow/CreateDataflowView.java | 46 +++++++++++
.../dataflow/DataflowPipelineTranslator.java | 9 +--
.../dataflow/StreamingViewOverrides.java | 3 +-
.../DataflowPipelineTranslatorTest.java | 80 +-------------------
5 files changed, 58 insertions(+), 88 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/1ce88e1d/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
index af96403..86bfeb6 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
@@ -831,7 +831,7 @@ class BatchViewOverrides {
return Pipeline.applyTransform(outputs,
Flatten.<IsmRecord<WindowedValue<V>>>pCollections())
- .apply(CreatePCollectionView.<IsmRecord<WindowedValue<V>>,
+ .apply(CreateDataflowView.<IsmRecord<WindowedValue<V>>,
ViewT>of(view));
}
@@ -975,7 +975,7 @@ class BatchViewOverrides {
runner.addPCollectionRequiringIndexedFormat(reifiedPerWindowAndSorted);
return reifiedPerWindowAndSorted.apply(
- CreatePCollectionView.<IsmRecord<WindowedValue<FinalT>>, ViewT>of(view));
+ CreateDataflowView.<IsmRecord<WindowedValue<FinalT>>, ViewT>of(view));
}
@Override
@@ -1119,7 +1119,7 @@ class BatchViewOverrides {
runner.addPCollectionRequiringIndexedFormat(reifiedPerWindowAndSorted);
return reifiedPerWindowAndSorted.apply(
- CreatePCollectionView.<IsmRecord<WindowedValue<T>>, ViewT>of(view));
+ CreateDataflowView.<IsmRecord<WindowedValue<T>>, ViewT>of(view));
}
PCollection<IsmRecord<WindowedValue<T>>> reifiedPerWindowAndSorted = input
@@ -1129,7 +1129,7 @@ class BatchViewOverrides {
runner.addPCollectionRequiringIndexedFormat(reifiedPerWindowAndSorted);
return reifiedPerWindowAndSorted.apply(
- CreatePCollectionView.<IsmRecord<WindowedValue<T>>, ViewT>of(view));
+ CreateDataflowView.<IsmRecord<WindowedValue<T>>, ViewT>of(view));
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/1ce88e1d/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java
new file mode 100644
index 0000000..e7542cb
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.dataflow;
+
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+
+/** A {@link DataflowRunner} marker class for creating a {@link PCollectionView}. */
+public class CreateDataflowView<ElemT, ViewT>
+ extends PTransform<PCollection<ElemT>, PCollectionView<ViewT>> {
+ public static <ElemT, ViewT> CreateDataflowView<ElemT, ViewT> of(PCollectionView<ViewT> view) {
+ return new CreateDataflowView<>(view);
+ }
+
+ private final PCollectionView<ViewT> view;
+
+ private CreateDataflowView(PCollectionView<ViewT> view) {
+ this.view = view;
+ }
+
+ @Override
+ public PCollectionView<ViewT> expand(PCollection<ElemT> input) {
+ return view;
+ }
+
+ public PCollectionView<ViewT> getView() {
+ return view;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/1ce88e1d/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index 6d231b9..9b80756 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -79,7 +79,6 @@ import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
@@ -691,15 +690,15 @@ public class DataflowPipelineTranslator {
static {
registerTransformTranslator(
- View.CreatePCollectionView.class,
- new TransformTranslator<View.CreatePCollectionView>() {
+ CreateDataflowView.class,
+ new TransformTranslator<CreateDataflowView>() {
@Override
- public void translate(View.CreatePCollectionView transform, TranslationContext context) {
+ public void translate(CreateDataflowView transform, TranslationContext context) {
translateTyped(transform, context);
}
private <ElemT, ViewT> void translateTyped(
- View.CreatePCollectionView<ElemT, ViewT> transform, TranslationContext context) {
+ CreateDataflowView<ElemT, ViewT> transform, TranslationContext context) {
StepTranslationContext stepContext =
context.addStep(transform, "CollectionToSingleton");
PCollection<ElemT> input = context.getInput(transform);
http://git-wip-us.apache.org/repos/asf/beam/blob/1ce88e1d/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java
index 5f0cb26..c407517 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java
@@ -29,7 +29,6 @@ import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
@@ -61,7 +60,7 @@ class StreamingViewOverrides {
return input
.apply(Combine.globally(new Concatenate<ElemT>()).withoutDefaults())
.apply(ParDo.of(StreamingPCollectionViewWriterFn.create(view, input.getCoder())))
- .apply(View.CreatePCollectionView.<ElemT, ViewT>of(view));
+ .apply(CreateDataflowView.<ElemT, ViewT>of(view));
}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/1ce88e1d/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index eb55566..5016d88 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -135,6 +135,8 @@ public class DataflowPipelineTranslatorTest implements Serializable {
p.apply("ReadMyFile", TextIO.Read.from("gs://bucket/object"))
.apply("WriteMyFile", TextIO.Write.to("gs://bucket/object"));
+ DataflowRunner runner = DataflowRunner.fromOptions(options);
+ runner.replaceTransforms(p);
return p;
}
@@ -185,7 +187,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
DataflowPipelineOptions options = buildPipelineOptions();
options.setRunner(DataflowRunner.class);
- Pipeline p = buildPipeline(options);
+ Pipeline p = Pipeline.create(options);
p.traverseTopologically(new RecordingPipelineVisitor());
Job job =
DataflowPipelineTranslator.fromOptions(options)
@@ -769,82 +771,6 @@ public class DataflowPipelineTranslatorTest implements Serializable {
Collections.<DataflowPackage>emptyList());
}
- @Test
- public void testToSingletonTranslation() throws Exception {
- // A "change detector" test that makes sure the translation
- // of getting a PCollectionView<T> does not change
- // in bad ways during refactor
-
- DataflowPipelineOptions options = buildPipelineOptions();
- options.setExperiments(ImmutableList.of("disable_ism_side_input"));
- DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);
-
- Pipeline pipeline = Pipeline.create(options);
- pipeline.apply(Create.of(1))
- .apply(View.<Integer>asSingleton());
- Job job =
- translator
- .translate(
- pipeline,
- DataflowRunner.fromOptions(options),
- Collections.<DataflowPackage>emptyList())
- .getJob();
- assertAllStepOutputsHaveUniqueIds(job);
-
- List<Step> steps = job.getSteps();
- assertEquals(6, steps.size());
-
- Step createStep = steps.get(0);
- assertEquals("ParallelRead", createStep.getKind());
-
- Step addNullKeyStep = steps.get(1);
- assertEquals("ParallelDo", addNullKeyStep.getKind());
-
- Step groupByKeyStep = steps.get(2);
- assertEquals("GroupByKey", groupByKeyStep.getKind());
-
- Step combineGroupedValuesStep = steps.get(3);
- assertEquals("ParallelDo", combineGroupedValuesStep.getKind());
-
- Step dropKeysStep = steps.get(4);
- assertEquals("ParallelDo", dropKeysStep.getKind());
-
- Step collectionToSingletonStep = steps.get(5);
- assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind());
- }
-
- @Test
- public void testToIterableTranslation() throws Exception {
- // A "change detector" test that makes sure the translation
- // of getting a PCollectionView<Iterable<T>> does not change
- // in bad ways during refactor
-
- DataflowPipelineOptions options = buildPipelineOptions();
- options.setExperiments(ImmutableList.of("disable_ism_side_input"));
- DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);
-
- Pipeline pipeline = Pipeline.create(options);
- pipeline.apply(Create.of(1, 2, 3))
- .apply(View.<Integer>asIterable());
- Job job =
- translator
- .translate(
- pipeline,
- DataflowRunner.fromOptions(options),
- Collections.<DataflowPackage>emptyList())
- .getJob();
- assertAllStepOutputsHaveUniqueIds(job);
-
- List<Step> steps = job.getSteps();
- assertEquals(2, steps.size());
-
- Step createStep = steps.get(0);
- assertEquals("ParallelRead", createStep.getKind());
-
- Step collectionToSingletonStep = steps.get(1);
- assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind());
- }
-
/**
* Smoke test to fail fast if translation of a stateful ParDo
* in batch breaks.