You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/04/03 16:18:16 UTC

[1/2] beam git commit: This closes #2387

Repository: beam
Updated Branches:
  refs/heads/master fe1d4124e -> 9beb04ed7


This closes #2387


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9beb04ed
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9beb04ed
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9beb04ed

Branch: refs/heads/master
Commit: 9beb04ed74c28db8c680bb1d59eace5dc25efe08
Parents: fe1d412 1ce88e1
Author: Thomas Groh <tg...@google.com>
Authored: Mon Apr 3 09:18:06 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Mon Apr 3 09:18:06 2017 -0700

----------------------------------------------------------------------
 .../runners/dataflow/BatchViewOverrides.java    |  8 +-
 .../runners/dataflow/CreateDataflowView.java    | 46 +++++++++++
 .../dataflow/DataflowPipelineTranslator.java    |  9 +--
 .../dataflow/StreamingViewOverrides.java        |  3 +-
 .../DataflowPipelineTranslatorTest.java         | 80 +-------------------
 5 files changed, 58 insertions(+), 88 deletions(-)
----------------------------------------------------------------------



[2/2] beam git commit: Add a Dataflow-specific primitive for creating a view

Posted by tg...@apache.org.
Add a Dataflow-specific primitive for creating a view

Allows overrides of CreatePCollectionView to work with the batch
override API.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1ce88e1d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1ce88e1d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1ce88e1d

Branch: refs/heads/master
Commit: 1ce88e1db5c75bedf55bea38786f55818627e29e
Parents: fe1d412
Author: Thomas Groh <tg...@google.com>
Authored: Thu Mar 30 18:09:26 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Mon Apr 3 09:18:06 2017 -0700

----------------------------------------------------------------------
 .../runners/dataflow/BatchViewOverrides.java    |  8 +-
 .../runners/dataflow/CreateDataflowView.java    | 46 +++++++++++
 .../dataflow/DataflowPipelineTranslator.java    |  9 +--
 .../dataflow/StreamingViewOverrides.java        |  3 +-
 .../DataflowPipelineTranslatorTest.java         | 80 +-------------------
 5 files changed, 58 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/1ce88e1d/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
index af96403..86bfeb6 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
@@ -831,7 +831,7 @@ class BatchViewOverrides {
 
       return Pipeline.applyTransform(outputs,
           Flatten.<IsmRecord<WindowedValue<V>>>pCollections())
-          .apply(CreatePCollectionView.<IsmRecord<WindowedValue<V>>,
+          .apply(CreateDataflowView.<IsmRecord<WindowedValue<V>>,
               ViewT>of(view));
     }
 
@@ -975,7 +975,7 @@ class BatchViewOverrides {
 
       runner.addPCollectionRequiringIndexedFormat(reifiedPerWindowAndSorted);
       return reifiedPerWindowAndSorted.apply(
-          CreatePCollectionView.<IsmRecord<WindowedValue<FinalT>>, ViewT>of(view));
+          CreateDataflowView.<IsmRecord<WindowedValue<FinalT>>, ViewT>of(view));
     }
 
     @Override
@@ -1119,7 +1119,7 @@ class BatchViewOverrides {
 
         runner.addPCollectionRequiringIndexedFormat(reifiedPerWindowAndSorted);
         return reifiedPerWindowAndSorted.apply(
-            CreatePCollectionView.<IsmRecord<WindowedValue<T>>, ViewT>of(view));
+            CreateDataflowView.<IsmRecord<WindowedValue<T>>, ViewT>of(view));
       }
 
       PCollection<IsmRecord<WindowedValue<T>>> reifiedPerWindowAndSorted = input
@@ -1129,7 +1129,7 @@ class BatchViewOverrides {
 
       runner.addPCollectionRequiringIndexedFormat(reifiedPerWindowAndSorted);
       return reifiedPerWindowAndSorted.apply(
-          CreatePCollectionView.<IsmRecord<WindowedValue<T>>, ViewT>of(view));
+          CreateDataflowView.<IsmRecord<WindowedValue<T>>, ViewT>of(view));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/1ce88e1d/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java
new file mode 100644
index 0000000..e7542cb
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.dataflow;
+
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+
+/** A {@link DataflowRunner} marker class for creating a {@link PCollectionView}. */
+public class CreateDataflowView<ElemT, ViewT>
+    extends PTransform<PCollection<ElemT>, PCollectionView<ViewT>> {
+  public static <ElemT, ViewT> CreateDataflowView<ElemT, ViewT> of(PCollectionView<ViewT> view) {
+    return new CreateDataflowView<>(view);
+  }
+
+  private final PCollectionView<ViewT> view;
+
+  private CreateDataflowView(PCollectionView<ViewT> view) {
+    this.view = view;
+  }
+
+  @Override
+  public PCollectionView<ViewT> expand(PCollection<ElemT> input) {
+    return view;
+  }
+
+  public PCollectionView<ViewT> getView() {
+    return view;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/1ce88e1d/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index 6d231b9..9b80756 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -79,7 +79,6 @@ import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.HasDisplayData;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
@@ -691,15 +690,15 @@ public class DataflowPipelineTranslator {
 
   static {
     registerTransformTranslator(
-        View.CreatePCollectionView.class,
-        new TransformTranslator<View.CreatePCollectionView>() {
+        CreateDataflowView.class,
+        new TransformTranslator<CreateDataflowView>() {
           @Override
-          public void translate(View.CreatePCollectionView transform, TranslationContext context) {
+          public void translate(CreateDataflowView transform, TranslationContext context) {
             translateTyped(transform, context);
           }
 
           private <ElemT, ViewT> void translateTyped(
-              View.CreatePCollectionView<ElemT, ViewT> transform, TranslationContext context) {
+              CreateDataflowView<ElemT, ViewT> transform, TranslationContext context) {
             StepTranslationContext stepContext =
                 context.addStep(transform, "CollectionToSingleton");
             PCollection<ElemT> input = context.getInput(transform);

http://git-wip-us.apache.org/repos/asf/beam/blob/1ce88e1d/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java
index 5f0cb26..c407517 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java
@@ -29,7 +29,6 @@ import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
@@ -61,7 +60,7 @@ class StreamingViewOverrides {
         return input
             .apply(Combine.globally(new Concatenate<ElemT>()).withoutDefaults())
             .apply(ParDo.of(StreamingPCollectionViewWriterFn.create(view, input.getCoder())))
-            .apply(View.CreatePCollectionView.<ElemT, ViewT>of(view));
+            .apply(CreateDataflowView.<ElemT, ViewT>of(view));
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/1ce88e1d/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index eb55566..5016d88 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -135,6 +135,8 @@ public class DataflowPipelineTranslatorTest implements Serializable {
 
     p.apply("ReadMyFile", TextIO.Read.from("gs://bucket/object"))
      .apply("WriteMyFile", TextIO.Write.to("gs://bucket/object"));
+    DataflowRunner runner = DataflowRunner.fromOptions(options);
+    runner.replaceTransforms(p);
 
     return p;
   }
@@ -185,7 +187,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     DataflowPipelineOptions options = buildPipelineOptions();
     options.setRunner(DataflowRunner.class);
 
-    Pipeline p = buildPipeline(options);
+    Pipeline p = Pipeline.create(options);
     p.traverseTopologically(new RecordingPipelineVisitor());
     Job job =
         DataflowPipelineTranslator.fromOptions(options)
@@ -769,82 +771,6 @@ public class DataflowPipelineTranslatorTest implements Serializable {
         Collections.<DataflowPackage>emptyList());
   }
 
-  @Test
-  public void testToSingletonTranslation() throws Exception {
-    // A "change detector" test that makes sure the translation
-    // of getting a PCollectionView<T> does not change
-    // in bad ways during refactor
-
-    DataflowPipelineOptions options = buildPipelineOptions();
-    options.setExperiments(ImmutableList.of("disable_ism_side_input"));
-    DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);
-
-    Pipeline pipeline = Pipeline.create(options);
-    pipeline.apply(Create.of(1))
-        .apply(View.<Integer>asSingleton());
-    Job job =
-        translator
-            .translate(
-                pipeline,
-                DataflowRunner.fromOptions(options),
-                Collections.<DataflowPackage>emptyList())
-            .getJob();
-    assertAllStepOutputsHaveUniqueIds(job);
-
-    List<Step> steps = job.getSteps();
-    assertEquals(6, steps.size());
-
-    Step createStep = steps.get(0);
-    assertEquals("ParallelRead", createStep.getKind());
-
-    Step addNullKeyStep = steps.get(1);
-    assertEquals("ParallelDo", addNullKeyStep.getKind());
-
-    Step groupByKeyStep = steps.get(2);
-    assertEquals("GroupByKey", groupByKeyStep.getKind());
-
-    Step combineGroupedValuesStep = steps.get(3);
-    assertEquals("ParallelDo", combineGroupedValuesStep.getKind());
-
-    Step dropKeysStep = steps.get(4);
-    assertEquals("ParallelDo", dropKeysStep.getKind());
-
-    Step collectionToSingletonStep = steps.get(5);
-    assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind());
-  }
-
-  @Test
-  public void testToIterableTranslation() throws Exception {
-    // A "change detector" test that makes sure the translation
-    // of getting a PCollectionView<Iterable<T>> does not change
-    // in bad ways during refactor
-
-    DataflowPipelineOptions options = buildPipelineOptions();
-    options.setExperiments(ImmutableList.of("disable_ism_side_input"));
-    DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);
-
-    Pipeline pipeline = Pipeline.create(options);
-    pipeline.apply(Create.of(1, 2, 3))
-        .apply(View.<Integer>asIterable());
-    Job job =
-        translator
-            .translate(
-                pipeline,
-                DataflowRunner.fromOptions(options),
-                Collections.<DataflowPackage>emptyList())
-            .getJob();
-    assertAllStepOutputsHaveUniqueIds(job);
-
-    List<Step> steps = job.getSteps();
-    assertEquals(2, steps.size());
-
-    Step createStep = steps.get(0);
-    assertEquals("ParallelRead", createStep.getKind());
-
-    Step collectionToSingletonStep = steps.get(1);
-    assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind());
-  }
-
   /**
    * Smoke test to fail fast if translation of a stateful ParDo
    * in batch breaks.