You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by th...@apache.org on 2016/10/25 16:28:32 UTC

[14/50] incubator-beam git commit: [BEAM-794] Differ combining in case of merging windows with sideInputs.

[BEAM-794] Differ combining in case of merging windows with sideInputs.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a7cc8206
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a7cc8206
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a7cc8206

Branch: refs/heads/apex-runner
Commit: a7cc8206cbbc6ac10e71a0563da2fea4c708277b
Parents: 4c90582
Author: Sela <an...@paypal.com>
Authored: Fri Oct 21 16:00:57 2016 +0300
Committer: Sela <an...@paypal.com>
Committed: Sat Oct 22 12:23:33 2016 +0300

----------------------------------------------------------------------
 .../apache/beam/runners/spark/SparkRunner.java  | 33 +++++++++++++++++++-
 1 file changed, 32 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a7cc8206/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
index cad53be..b17c38c 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
@@ -19,6 +19,7 @@
 package org.apache.beam.runners.spark;
 
 import java.util.Collection;
+import java.util.List;
 import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly;
 import org.apache.beam.runners.spark.translation.EvaluationContext;
 import org.apache.beam.runners.spark.translation.SparkContextFactory;
@@ -34,11 +35,13 @@ import org.apache.beam.sdk.options.PipelineOptionsValidator;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.runners.TransformTreeNode;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.PValue;
@@ -206,7 +209,7 @@ public final class SparkRunner extends PipelineRunner<EvaluationResult> {
         @SuppressWarnings("unchecked")
         Class<PTransform<?, ?>> transformClass =
             (Class<PTransform<?, ?>>) node.getTransform().getClass();
-        if (translator.hasTranslation(transformClass)) {
+        if (translator.hasTranslation(transformClass) && !shouldDefer(node)) {
           LOG.info("Entering directly-translatable composite transform: '{}'", node.getFullName());
           LOG.debug("Composite transform class: '{}'", transformClass);
           doVisitTransform(node);
@@ -216,6 +219,34 @@ public final class SparkRunner extends PipelineRunner<EvaluationResult> {
       return CompositeBehavior.ENTER_TRANSFORM;
     }
 
+    private boolean shouldDefer(TransformTreeNode node) {
+      PInput input = node.getInput();
+      // if the input is not a PCollection, or it is but with non merging windows, don't defer.
+      if (!(input instanceof PCollection)
+          || ((PCollection) input).getWindowingStrategy().getWindowFn().isNonMerging()) {
+        return false;
+      }
+      // so far we know that the input is a PCollection with merging windows.
+      // check for sideInput in case of a Combine transform.
+      PTransform<?, ?> transform = node.getTransform();
+      boolean hasSideInput = false;
+      if (transform instanceof Combine.PerKey) {
+        List<PCollectionView<?>> sideInputs = ((Combine.PerKey<?, ?, ?>) transform).getSideInputs();
+        hasSideInput = sideInputs != null && !sideInputs.isEmpty();
+      } else if (transform instanceof Combine.Globally) {
+        List<PCollectionView<?>> sideInputs = ((Combine.Globally<?, ?>) transform).getSideInputs();
+        hasSideInput = sideInputs != null && !sideInputs.isEmpty();
+      }
+      // defer if sideInputs are defined.
+      if (hasSideInput) {
+        LOG.info("Deferring combine transformation {} for job {}", transform,
+            ctxt.getPipeline().getOptions().getJobName());
+        return true;
+      }
+      // default.
+      return false;
+    }
+
     @Override
     public void visitPrimitiveTransform(TransformTreeNode node) {
       doVisitTransform(node);