You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by th...@apache.org on 2016/10/25 16:28:32 UTC
[14/50] incubator-beam git commit: [BEAM-794] Differ combining in
case of merging windows with sideInputs.
[BEAM-794] Differ combining in case of merging windows with sideInputs.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a7cc8206
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a7cc8206
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a7cc8206
Branch: refs/heads/apex-runner
Commit: a7cc8206cbbc6ac10e71a0563da2fea4c708277b
Parents: 4c90582
Author: Sela <an...@paypal.com>
Authored: Fri Oct 21 16:00:57 2016 +0300
Committer: Sela <an...@paypal.com>
Committed: Sat Oct 22 12:23:33 2016 +0300
----------------------------------------------------------------------
.../apache/beam/runners/spark/SparkRunner.java | 33 +++++++++++++++++++-
1 file changed, 32 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a7cc8206/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
index cad53be..b17c38c 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
@@ -19,6 +19,7 @@
package org.apache.beam.runners.spark;
import java.util.Collection;
+import java.util.List;
import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly;
import org.apache.beam.runners.spark.translation.EvaluationContext;
import org.apache.beam.runners.spark.translation.SparkContextFactory;
@@ -34,11 +35,13 @@ import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.runners.TransformTreeNode;
import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.PValue;
@@ -206,7 +209,7 @@ public final class SparkRunner extends PipelineRunner<EvaluationResult> {
@SuppressWarnings("unchecked")
Class<PTransform<?, ?>> transformClass =
(Class<PTransform<?, ?>>) node.getTransform().getClass();
- if (translator.hasTranslation(transformClass)) {
+ if (translator.hasTranslation(transformClass) && !shouldDefer(node)) {
LOG.info("Entering directly-translatable composite transform: '{}'", node.getFullName());
LOG.debug("Composite transform class: '{}'", transformClass);
doVisitTransform(node);
@@ -216,6 +219,34 @@ public final class SparkRunner extends PipelineRunner<EvaluationResult> {
return CompositeBehavior.ENTER_TRANSFORM;
}
+ private boolean shouldDefer(TransformTreeNode node) {
+ PInput input = node.getInput();
+ // if the input is not a PCollection, or it is but with non merging windows, don't defer.
+ if (!(input instanceof PCollection)
+ || ((PCollection) input).getWindowingStrategy().getWindowFn().isNonMerging()) {
+ return false;
+ }
+ // so far we know that the input is a PCollection with merging windows.
+ // check for sideInput in case of a Combine transform.
+ PTransform<?, ?> transform = node.getTransform();
+ boolean hasSideInput = false;
+ if (transform instanceof Combine.PerKey) {
+ List<PCollectionView<?>> sideInputs = ((Combine.PerKey<?, ?, ?>) transform).getSideInputs();
+ hasSideInput = sideInputs != null && !sideInputs.isEmpty();
+ } else if (transform instanceof Combine.Globally) {
+ List<PCollectionView<?>> sideInputs = ((Combine.Globally<?, ?>) transform).getSideInputs();
+ hasSideInput = sideInputs != null && !sideInputs.isEmpty();
+ }
+ // defer if sideInputs are defined.
+ if (hasSideInput) {
+ LOG.info("Deferring combine transformation {} for job {}", transform,
+ ctxt.getPipeline().getOptions().getJobName());
+ return true;
+ }
+ // default.
+ return false;
+ }
+
@Override
public void visitPrimitiveTransform(TransformTreeNode node) {
doVisitTransform(node);