You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/11/17 20:31:24 UTC

[33/50] [abbrv] beam git commit: Fix BatchViewOverrides ViewAsSingleton to apply the combine fn that was being replaced.

Fix BatchViewOverrides ViewAsSingleton to apply the combine fn that was being replaced.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/accb2087
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/accb2087
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/accb2087

Branch: refs/heads/tez-runner
Commit: accb2087f88c641be9db038bbb5be715aacffb8d
Parents: c2f815c
Author: Luke Cwik <lc...@google.com>
Authored: Tue Nov 14 19:19:48 2017 -0800
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Nov 15 08:59:51 2017 -0800

----------------------------------------------------------------------
 .../runners/dataflow/BatchViewOverrides.java    | 16 +++-
 .../runners/dataflow/CreateDataflowView.java    |  6 +-
 .../beam/runners/dataflow/DataflowRunner.java   | 80 +++++++++++++++-----
 .../DataflowPipelineTranslatorTest.java         |  6 +-
 4 files changed, 80 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/accb2087/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
index 8ed41cb..2953a42 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
@@ -55,6 +55,8 @@ import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.coders.StructuredCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
@@ -893,15 +895,23 @@ class BatchViewOverrides {
 
     private final DataflowRunner runner;
     private final PCollectionView<T> view;
-    /** Builds an instance of this class from the overridden transform. */
-    @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
-    public BatchViewAsSingleton(DataflowRunner runner, CreatePCollectionView<T, T> transform) {
+    private final CombineFn<T, ?, T> combineFn;
+    private final int fanout;
+
+    public BatchViewAsSingleton(
+        DataflowRunner runner,
+        CreatePCollectionView<T, T> transform,
+        CombineFn<T, ?, T> combineFn,
+        int fanout) {
       this.runner = runner;
       this.view = transform.getView();
+      this.combineFn = combineFn;
+      this.fanout = fanout;
     }
 
     @Override
     public PCollection<?> expand(PCollection<T> input) {
+      input = input.apply(Combine.globally(combineFn).withoutDefaults().withFanout(fanout));
       @SuppressWarnings("unchecked")
       Coder<BoundedWindow> windowCoder = (Coder<BoundedWindow>)
           input.getWindowingStrategy().getWindowFn().windowCoder();

http://git-wip-us.apache.org/repos/asf/beam/blob/accb2087/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java
index 10888c2..f64f3fb 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java
@@ -25,11 +25,13 @@ import org.apache.beam.sdk.values.PCollectionView;
 /** A {@link DataflowRunner} marker class for creating a {@link PCollectionView}. */
 public class CreateDataflowView<ElemT, ViewT>
     extends PTransform<PCollection<ElemT>, PCollection<ElemT>> {
-  public static <ElemT, ViewT> CreateDataflowView<ElemT, ViewT> forBatch(PCollectionView<ViewT> view) {
+  public static <ElemT, ViewT> CreateDataflowView<ElemT, ViewT> forBatch(
+      PCollectionView<ViewT> view) {
     return new CreateDataflowView<>(view, false);
   }
 
-  public static <ElemT, ViewT> CreateDataflowView<ElemT, ViewT> forStreaming(PCollectionView<ViewT> view) {
+  public static <ElemT, ViewT> CreateDataflowView<ElemT, ViewT> forStreaming(
+      PCollectionView<ViewT> view) {
     return new CreateDataflowView<>(view, true);
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/accb2087/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 72e4f83..a650092 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -116,6 +116,7 @@ import org.apache.beam.sdk.runners.TransformHierarchy.Node;
 import org.apache.beam.sdk.state.MapState;
 import org.apache.beam.sdk.state.SetState;
 import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.Combine.GroupedValues;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -406,9 +407,8 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
                       BatchViewOverrides.BatchViewAsMultimap.class, this)))
           .add(
               PTransformOverride.of(
-                  PTransformMatchers.classEqualTo(View.AsSingleton.class),
-                  new ReflectiveViewOverrideFactory(
-                      BatchViewOverrides.BatchViewAsSingleton.class, this)))
+                  PTransformMatchers.classEqualTo(Combine.GloballyAsSingletonView.class),
+                  new CombineGloballyAsSingletonViewOverrideFactory(this)))
           .add(
               PTransformOverride.of(
                   PTransformMatchers.classEqualTo(View.AsList.class),
@@ -437,29 +437,58 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
   }
 
   /**
-   * Replace the View.AsYYY transform with specialized view overrides for Dataflow. It is required that the
-   * new replacement transform uses the supplied PCollectionView and does not create another instance.
+   * Replace the {@link Combine.GloballyAsSingletonView} transform with a specialization which
+   * re-applies the {@link CombineFn} and adds a specialization specific to the Dataflow runner.
+   */
+  private static class CombineGloballyAsSingletonViewOverrideFactory<InputT, ViewT>
+      extends ReflectiveViewOverrideFactory<InputT, ViewT> {
+
+    private CombineGloballyAsSingletonViewOverrideFactory(DataflowRunner runner) {
+      super((Class) BatchViewOverrides.BatchViewAsSingleton.class, runner);
+    }
+
+    @Override
+    public PTransformReplacement<PCollection<InputT>, PValue> getReplacementTransform(
+        AppliedPTransform<
+            PCollection<InputT>,
+            PValue,
+            PTransform<PCollection<InputT>, PValue>> transform) {
+      Combine.GloballyAsSingletonView<?, ?> combineTransform =
+          (Combine.GloballyAsSingletonView) transform.getTransform();
+      return PTransformReplacement.of(
+          PTransformReplacements.getSingletonMainInput(transform),
+          new BatchViewOverrides.BatchViewAsSingleton(
+              runner,
+              findCreatePCollectionView(transform),
+              (CombineFn) combineTransform.getCombineFn(),
+              combineTransform.getFanout()));
+    }
+  }
+
+  /**
+   * Replace the View.AsYYY transform with specialized view overrides for Dataflow. It is required
+   * that the new replacement transform uses the supplied PCollectionView and does not create
+   * another instance.
    */
   private static class ReflectiveViewOverrideFactory<InputT, ViewT>
       implements PTransformOverrideFactory<PCollection<InputT>,
       PValue, PTransform<PCollection<InputT>, PValue>> {
 
-    private final Class<PTransform<PCollection<InputT>, PCollectionView<ViewT>>> replacement;
-    private final DataflowRunner runner;
+    final Class<PTransform<PCollection<InputT>, PValue>> replacement;
+    final DataflowRunner runner;
 
     private ReflectiveViewOverrideFactory(
-        Class<PTransform<PCollection<InputT>, PCollectionView<ViewT>>> replacement,
+        Class<PTransform<PCollection<InputT>, PValue>> replacement,
         DataflowRunner runner) {
       this.replacement = replacement;
       this.runner = runner;
     }
 
-    @Override
-    public PTransformReplacement<PCollection<InputT>, PValue> getReplacementTransform(
-         final AppliedPTransform<PCollection<InputT>,
-             PValue,
-             PTransform<PCollection<InputT>, PValue>> transform) {
-
+    CreatePCollectionView<ViewT, ViewT> findCreatePCollectionView(
+        final AppliedPTransform<
+            PCollection<InputT>,
+            PValue,
+            PTransform<PCollection<InputT>, PValue>> transform) {
       final AtomicReference<CreatePCollectionView> viewTransformRef = new AtomicReference<>();
       transform.getPipeline().traverseTopologically(new PipelineVisitor.Defaults() {
         // Stores whether we have entered the expected composite view transform.
@@ -495,18 +524,29 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
       checkState(viewTransformRef.get() != null,
           "Expected to find CreatePCollectionView contained within %s",
           transform.getTransform());
-      PTransform<PCollection<InputT>, PCollectionView<ViewT>> rep =
+      return viewTransformRef.get();
+    }
+
+    @Override
+    public PTransformReplacement<PCollection<InputT>, PValue> getReplacementTransform(
+         final AppliedPTransform<PCollection<InputT>,
+             PValue,
+             PTransform<PCollection<InputT>, PValue>> transform) {
+
+      PTransform<PCollection<InputT>, PValue> rep =
           InstanceBuilder.ofType(replacement)
               .withArg(DataflowRunner.class, runner)
-              .withArg(CreatePCollectionView.class, viewTransformRef.get())
+              .withArg(CreatePCollectionView.class, findCreatePCollectionView(transform))
               .build();
-      return PTransformReplacement.of(PTransformReplacements.getSingletonMainInput(transform), (PTransform) rep);
+      return PTransformReplacement.of(
+          PTransformReplacements.getSingletonMainInput(transform), (PTransform) rep);
     }
 
     @Override
-    public Map<PValue, ReplacementOutput> mapOutputs(Map<TupleTag<?>, PValue> outputs, PValue newOutput) {
-      // We do not replace any of the outputs because we expect that the new PTransform will re-use the original
-      // PCollectionView that was returned.
+    public Map<PValue, ReplacementOutput> mapOutputs(
+        Map<TupleTag<?>, PValue> outputs, PValue newOutput) {
+      // We do not replace any of the outputs because we expect that the new PTransform will
+      // re-use the original PCollectionView that was returned.
       return ImmutableMap.of();
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/accb2087/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index e03abb9..81e7a97 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -991,15 +991,15 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     assertAllStepOutputsHaveUniqueIds(job);
 
     List<Step> steps = job.getSteps();
-    assertEquals(5, steps.size());
+    assertEquals(9, steps.size());
 
     @SuppressWarnings("unchecked")
     List<Map<String, Object>> toIsmRecordOutputs =
-        (List<Map<String, Object>>) steps.get(3).getProperties().get(PropertyNames.OUTPUT_INFO);
+        (List<Map<String, Object>>) steps.get(7).getProperties().get(PropertyNames.OUTPUT_INFO);
     assertTrue(
         Structs.getBoolean(Iterables.getOnlyElement(toIsmRecordOutputs), "use_indexed_format"));
 
-    Step collectionToSingletonStep = steps.get(4);
+    Step collectionToSingletonStep = steps.get(8);
     assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind());
   }