You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/05/09 14:52:31 UTC

[1/2] incubator-beam git commit: Add control of PipelineVisitor recursion into composite transforms

Repository: incubator-beam
Updated Branches:
  refs/heads/master 07c60a965 -> 03e99540a


Add control of PipelineVisitor recursion into composite transforms


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/dbf7a06a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/dbf7a06a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/dbf7a06a

Branch: refs/heads/master
Commit: dbf7a06a7c901cda065e49914932cf0be5d6db4e
Parents: 07c60a9
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Apr 20 12:28:50 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon May 9 07:29:49 2016 -0700

----------------------------------------------------------------------
 .../direct/ConsumerTrackingPipelineVisitor.java |  7 +-
 .../direct/KeyedPValueTrackingVisitor.java      |  5 +-
 .../FlinkBatchPipelineTranslator.java           | 83 +++++++++-----------
 .../translation/FlinkPipelineTranslator.java    |  2 +-
 .../FlinkStreamingPipelineTranslator.java       | 35 ++-------
 .../dataflow/DataflowPipelineRunner.java        |  5 +-
 .../dataflow/DataflowPipelineTranslator.java    |  7 +-
 .../dataflow/DataflowPipelineRunnerTest.java    | 18 +----
 .../beam/runners/spark/SparkPipelineRunner.java | 48 ++---------
 .../main/java/org/apache/beam/sdk/Pipeline.java | 35 ++++++++-
 .../runners/AggregatorPipelineExtractor.java    | 10 +--
 .../beam/sdk/runners/DirectPipelineRunner.java  | 12 +--
 .../sdk/runners/RecordingPipelineVisitor.java   | 12 +--
 .../beam/sdk/runners/TransformTreeNode.java     | 16 ++--
 .../AggregatorPipelineExtractorTest.java        |  2 +-
 .../beam/sdk/runners/TransformTreeTest.java     | 12 +--
 16 files changed, 124 insertions(+), 185 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dbf7a06a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitor.java
index c790463..3300723 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitor.java
@@ -41,7 +41,7 @@ import java.util.Set;
  * {@link Pipeline}. This is used to schedule consuming {@link PTransform PTransforms} to consume
  * input after the upstream transform has produced and committed output.
  */
-public class ConsumerTrackingPipelineVisitor implements PipelineVisitor {
+public class ConsumerTrackingPipelineVisitor extends PipelineVisitor.Defaults {
   private Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers = new HashMap<>();
   private Collection<AppliedPTransform<?, ?, ?>> rootTransforms = new ArrayList<>();
   private Collection<PCollectionView<?>> views = new ArrayList<>();
@@ -51,13 +51,14 @@ public class ConsumerTrackingPipelineVisitor implements PipelineVisitor {
   private boolean finalized = false;
 
   @Override
-  public void enterCompositeTransform(TransformTreeNode node) {
+  public CompositeBehavior enterCompositeTransform(TransformTreeNode node) {
     checkState(
         !finalized,
         "Attempting to traverse a pipeline (node %s) with a %s "
             + "which has already visited a Pipeline and is finalized",
         node.getFullName(),
         ConsumerTrackingPipelineVisitor.class.getSimpleName());
+    return CompositeBehavior.ENTER_TRANSFORM;
   }
 
   @Override
@@ -73,7 +74,7 @@ public class ConsumerTrackingPipelineVisitor implements PipelineVisitor {
   }
 
   @Override
-  public void visitTransform(TransformTreeNode node) {
+  public void visitPrimitiveTransform(TransformTreeNode node) {
     toFinalize.removeAll(node.getInput().expand());
     AppliedPTransform<?, ?, ?> appliedTransform = getAppliedTransform(node);
     stepNames.put(appliedTransform, genStepName());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dbf7a06a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java
index b7c755e..2fea00a 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java
@@ -56,12 +56,13 @@ class KeyedPValueTrackingVisitor implements PipelineVisitor {
   }
 
   @Override
-  public void enterCompositeTransform(TransformTreeNode node) {
+  public CompositeBehavior enterCompositeTransform(TransformTreeNode node) {
     checkState(
         !finalized,
         "Attempted to use a %s that has already been finalized on a pipeline (visiting node %s)",
         KeyedPValueTrackingVisitor.class.getSimpleName(),
         node);
+    return CompositeBehavior.ENTER_TRANSFORM;
   }
 
   @Override
@@ -79,7 +80,7 @@ class KeyedPValueTrackingVisitor implements PipelineVisitor {
   }
 
   @Override
-  public void visitTransform(TransformTreeNode node) {}
+  public void visitPrimitiveTransform(TransformTreeNode node) {}
 
   @Override
   public void visitValue(PValue value, TransformTreeNode producer) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dbf7a06a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
index 456cf09..3d39e81 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
@@ -43,11 +43,6 @@ public class FlinkBatchPipelineTranslator extends FlinkPipelineTranslator {
 
   private int depth = 0;
 
-  /**
-   * Composite transform that we want to translate before proceeding with other transforms.
-   */
-  private PTransform<?, ?> currentCompositeTransform;
-
   public FlinkBatchPipelineTranslator(ExecutionEnvironment env, PipelineOptions options) {
     this.batchContext = new FlinkBatchTranslationContext(env, options);
   }
@@ -57,54 +52,33 @@ public class FlinkBatchPipelineTranslator extends FlinkPipelineTranslator {
   // --------------------------------------------------------------------------------------------
 
   @Override
-  public void enterCompositeTransform(TransformTreeNode node) {
+  public CompositeBehavior enterCompositeTransform(TransformTreeNode node) {
     LOG.info(genSpaces(this.depth) + "enterCompositeTransform- " + formatNodeName(node));
 
-    PTransform<?, ?> transform = node.getTransform();
-    if (transform != null && currentCompositeTransform == null) {
-
-      BatchTransformTranslator<?> translator = FlinkBatchTransformTranslators.getTranslator(transform);
-      if (translator != null) {
-        currentCompositeTransform = transform;
-        if (transform instanceof CoGroupByKey && node.getInput().expand().size() != 2) {
-          // we can only optimize CoGroupByKey for input size 2
-          currentCompositeTransform = null;
-        }
-      }
+    BatchTransformTranslator<?> translator = getTranslator(node);
+
+    if (translator != null) {
+      applyBatchTransform(node.getTransform(), node, translator);
+      LOG.info(genSpaces(this.depth) + "translated-" + formatNodeName(node));
+      return CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
+    } else {
+      this.depth++;
+      return CompositeBehavior.ENTER_TRANSFORM;
     }
-    this.depth++;
   }
 
   @Override
   public void leaveCompositeTransform(TransformTreeNode node) {
-    PTransform<?, ?> transform = node.getTransform();
-    if (transform != null && currentCompositeTransform == transform) {
-
-      BatchTransformTranslator<?> translator = FlinkBatchTransformTranslators.getTranslator(transform);
-      if (translator != null) {
-        LOG.info(genSpaces(this.depth) + "doingCompositeTransform- " + formatNodeName(node));
-        applyBatchTransform(transform, node, translator);
-        currentCompositeTransform = null;
-      } else {
-        throw new IllegalStateException("Attempted to translate composite transform " +
-            "but no translator was found: " + currentCompositeTransform);
-      }
-    }
     this.depth--;
     LOG.info(genSpaces(this.depth) + "leaveCompositeTransform- " + formatNodeName(node));
   }
 
   @Override
-  public void visitTransform(TransformTreeNode node) {
-    LOG.info(genSpaces(this.depth) + "visitTransform- " + formatNodeName(node));
-    if (currentCompositeTransform != null) {
-      // ignore it
-      return;
-    }
+  public void visitPrimitiveTransform(TransformTreeNode node) {
+    LOG.info(genSpaces(this.depth) + "visitPrimitiveTransform- " + formatNodeName(node));
 
-    // get the transformation corresponding to hte node we are
+    // get the transformation corresponding to the node we are
     // currently visiting and translate it into its Flink alternative.
-
     PTransform<?, ?> transform = node.getTransform();
     BatchTransformTranslator<?> translator = FlinkBatchTransformTranslators.getTranslator(transform);
     if (translator == null) {
@@ -114,11 +88,6 @@ public class FlinkBatchPipelineTranslator extends FlinkPipelineTranslator {
     applyBatchTransform(transform, node, translator);
   }
 
-  @Override
-  public void visitValue(PValue value, TransformTreeNode producer) {
-    // do nothing here
-  }
-
   private <T extends PTransform<?, ?>> void applyBatchTransform(PTransform<?, ?> transform, TransformTreeNode node, BatchTransformTranslator<?> translator) {
 
     @SuppressWarnings("unchecked")
@@ -140,6 +109,32 @@ public class FlinkBatchPipelineTranslator extends FlinkPipelineTranslator {
     void translateNode(Type transform, FlinkBatchTranslationContext context);
   }
 
+  /**
+   * Returns a translator for the given node, if it is possible, otherwise null.
+   */
+  private static BatchTransformTranslator<?> getTranslator(TransformTreeNode node) {
+    PTransform<?, ?> transform = node.getTransform();
+
+    // Root of the graph is null
+    if (transform == null) {
+      return null;
+    }
+
+    BatchTransformTranslator<?> translator = FlinkBatchTransformTranslators.getTranslator(transform);
+
+    // No translator known
+    if (translator == null) {
+      return null;
+    }
+
+    // We actually only specialize CoGroupByKey when exactly 2 inputs
+    if (transform instanceof CoGroupByKey && node.getInput().expand().size() != 2) {
+      return null;
+    }
+
+    return translator;
+  }
+
   private static String genSpaces(int n) {
     String s = "";
     for (int i = 0; i < n; i++) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dbf7a06a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkPipelineTranslator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkPipelineTranslator.java
index 82d23b0..46e5712 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkPipelineTranslator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkPipelineTranslator.java
@@ -28,7 +28,7 @@ import org.apache.beam.sdk.Pipeline;
  * a {@link org.apache.flink.streaming.api.datastream.DataStream} (for streaming) or a
  * {@link org.apache.flink.api.java.DataSet} (for batch) one.
  */
-public abstract class FlinkPipelineTranslator implements Pipeline.PipelineVisitor {
+public abstract class FlinkPipelineTranslator extends Pipeline.PipelineVisitor.Defaults {
 
   public void translate(Pipeline pipeline) {
     pipeline.traverseTopologically(this);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dbf7a06a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingPipelineTranslator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingPipelineTranslator.java
index ebaf6ba..31b2bee 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingPipelineTranslator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingPipelineTranslator.java
@@ -43,9 +43,6 @@ public class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator {
 
   private int depth = 0;
 
-  /** Composite transform that we want to translate before proceeding with other transforms. */
-  private PTransform<?, ?> currentCompositeTransform;
-
   public FlinkStreamingPipelineTranslator(StreamExecutionEnvironment env, PipelineOptions options) {
     this.streamingContext = new FlinkStreamingTranslationContext(env, options);
   }
@@ -55,47 +52,31 @@ public class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator {
   // --------------------------------------------------------------------------------------------
 
   @Override
-  public void enterCompositeTransform(TransformTreeNode node) {
+  public CompositeBehavior enterCompositeTransform(TransformTreeNode node) {
     LOG.info(genSpaces(this.depth) + "enterCompositeTransform- " + formatNodeName(node));
 
     PTransform<?, ?> transform = node.getTransform();
-    if (transform != null && currentCompositeTransform == null) {
-
+    if (transform != null) {
       StreamTransformTranslator<?> translator = FlinkStreamingTransformTranslators.getTranslator(transform);
       if (translator != null) {
-        currentCompositeTransform = transform;
+        applyStreamingTransform(transform, node, translator);
+        LOG.info(genSpaces(this.depth) + "translated-" + formatNodeName(node));
+        return CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
       }
     }
     this.depth++;
+    return CompositeBehavior.ENTER_TRANSFORM;
   }
 
   @Override
   public void leaveCompositeTransform(TransformTreeNode node) {
-    PTransform<?, ?> transform = node.getTransform();
-    if (transform != null && currentCompositeTransform == transform) {
-
-      StreamTransformTranslator<?> translator = FlinkStreamingTransformTranslators.getTranslator(transform);
-      if (translator != null) {
-        LOG.info(genSpaces(this.depth) + "doingCompositeTransform- " + formatNodeName(node));
-        applyStreamingTransform(transform, node, translator);
-        currentCompositeTransform = null;
-      } else {
-        throw new IllegalStateException("Attempted to translate composite transform " +
-            "but no translator was found: " + currentCompositeTransform);
-      }
-    }
     this.depth--;
     LOG.info(genSpaces(this.depth) + "leaveCompositeTransform- " + formatNodeName(node));
   }
 
   @Override
-  public void visitTransform(TransformTreeNode node) {
-    LOG.info(genSpaces(this.depth) + "visitTransform- " + formatNodeName(node));
-    if (currentCompositeTransform != null) {
-      // ignore it
-      return;
-    }
-
+  public void visitPrimitiveTransform(TransformTreeNode node) {
+    LOG.info(genSpaces(this.depth) + "visitPrimitiveTransform- " + formatNodeName(node));
     // get the transformation corresponding to hte node we are
     // currently visiting and translate it into its Flink alternative.
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dbf7a06a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRunner.java
index 41b4df7..4076802 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRunner.java
@@ -680,17 +680,18 @@ public class DataflowPipelineRunner extends PipelineRunner<DataflowPipelineJob>
         }
 
         @Override
-        public void visitTransform(TransformTreeNode node) {
+        public void visitPrimitiveTransform(TransformTreeNode node) {
           if (ptransformViewsWithNonDeterministicKeyCoders.contains(node.getTransform())) {
             ptransformViewNamesWithNonDeterministicKeyCoders.add(node.getFullName());
           }
         }
 
         @Override
-        public void enterCompositeTransform(TransformTreeNode node) {
+        public CompositeBehavior enterCompositeTransform(TransformTreeNode node) {
           if (ptransformViewsWithNonDeterministicKeyCoders.contains(node.getTransform())) {
             ptransformViewNamesWithNonDeterministicKeyCoders.add(node.getFullName());
           }
+          return CompositeBehavior.ENTER_TRANSFORM;
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dbf7a06a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index 4ef1bdb..05879d9 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -348,7 +348,7 @@ public class DataflowPipelineTranslator {
   /**
    * Translates a Pipeline into the Dataflow representation.
    */
-  class Translator implements PipelineVisitor, TranslationContext {
+  class Translator extends PipelineVisitor.Defaults implements TranslationContext {
     /** The Pipeline to translate. */
     private final Pipeline pipeline;
 
@@ -493,16 +493,13 @@ public class DataflowPipelineTranslator {
       return currentTransform;
     }
 
-    @Override
-    public void enterCompositeTransform(TransformTreeNode node) {
-    }
 
     @Override
     public void leaveCompositeTransform(TransformTreeNode node) {
     }
 
     @Override
-    public void visitTransform(TransformTreeNode node) {
+    public void visitPrimitiveTransform(TransformTreeNode node) {
       PTransform<?, ?> transform = node.getTransform();
       TransformTranslator translator =
           getTransformTranslator(transform.getClass());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dbf7a06a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineRunnerTest.java
index d4d4b3b..2993c50 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineRunnerTest.java
@@ -84,7 +84,6 @@ import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
 import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TimestampedValue;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
@@ -820,26 +819,15 @@ public class DataflowPipelineRunnerTest {
   }
 
   /** Records all the composite transforms visited within the Pipeline. */
-  private static class CompositeTransformRecorder implements PipelineVisitor {
+  private static class CompositeTransformRecorder extends PipelineVisitor.Defaults {
     private List<PTransform<?, ?>> transforms = new ArrayList<>();
 
     @Override
-    public void enterCompositeTransform(TransformTreeNode node) {
+    public CompositeBehavior enterCompositeTransform(TransformTreeNode node) {
       if (node.getTransform() != null) {
         transforms.add(node.getTransform());
       }
-    }
-
-    @Override
-    public void leaveCompositeTransform(TransformTreeNode node) {
-    }
-
-    @Override
-    public void visitTransform(TransformTreeNode node) {
-    }
-
-    @Override
-    public void visitValue(PValue value, TransformTreeNode producer) {
+      return CompositeBehavior.ENTER_TRANSFORM;
     }
 
     public List<PTransform<?, ?>> getCompositeTransforms() {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dbf7a06a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
index bae4e53..af5acf1 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
@@ -41,7 +41,6 @@ import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
-import org.apache.beam.sdk.values.PValue;
 
 import org.apache.spark.SparkException;
 import org.apache.spark.api.java.JavaSparkContext;
@@ -219,7 +218,7 @@ public final class SparkPipelineRunner extends PipelineRunner<EvaluationResult>
   /**
    * Evaluator on the pipeline.
    */
-  public abstract static class Evaluator implements Pipeline.PipelineVisitor {
+  public abstract static class Evaluator extends Pipeline.PipelineVisitor.Defaults {
     protected static final Logger LOG = LoggerFactory.getLogger(Evaluator.class);
 
     protected final SparkPipelineTranslator translator;
@@ -228,62 +227,29 @@ public final class SparkPipelineRunner extends PipelineRunner<EvaluationResult>
       this.translator = translator;
     }
 
-    // Set upon entering a composite node which can be directly mapped to a single
-    // TransformEvaluator.
-    private TransformTreeNode currentTranslatedCompositeNode;
-
-    /**
-     * If true, we're currently inside a subtree of a composite node which directly maps to a
-     * single
-     * TransformEvaluator; children nodes are ignored, and upon post-visiting the translated
-     * composite node, the associated TransformEvaluator will be visited.
-     */
-    private boolean inTranslatedCompositeNode() {
-      return currentTranslatedCompositeNode != null;
-    }
-
     @Override
-    public void enterCompositeTransform(TransformTreeNode node) {
-      if (!inTranslatedCompositeNode() && node.getTransform() != null) {
+    public CompositeBehavior enterCompositeTransform(TransformTreeNode node) {
+      if (node.getTransform() != null) {
         @SuppressWarnings("unchecked")
         Class<PTransform<?, ?>> transformClass =
             (Class<PTransform<?, ?>>) node.getTransform().getClass();
         if (translator.hasTranslation(transformClass)) {
           LOG.info("Entering directly-translatable composite transform: '{}'", node.getFullName());
           LOG.debug("Composite transform class: '{}'", transformClass);
-          currentTranslatedCompositeNode = node;
+          doVisitTransform(node);
+          return CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
         }
       }
+      return CompositeBehavior.ENTER_TRANSFORM;
     }
 
     @Override
-    public void leaveCompositeTransform(TransformTreeNode node) {
-      // NB: We depend on enterCompositeTransform and leaveCompositeTransform providing 'node'
-      // objects for which Object.equals() returns true iff they are the same logical node
-      // within the tree.
-      if (inTranslatedCompositeNode() && node.equals(currentTranslatedCompositeNode)) {
-        LOG.info("Post-visiting directly-translatable composite transform: '{}'",
-                node.getFullName());
-        doVisitTransform(node);
-        currentTranslatedCompositeNode = null;
-      }
-    }
-
-    @Override
-    public void visitTransform(TransformTreeNode node) {
-      if (inTranslatedCompositeNode()) {
-        LOG.info("Skipping '{}'; already in composite transform.", node.getFullName());
-        return;
-      }
+    public void visitPrimitiveTransform(TransformTreeNode node) {
       doVisitTransform(node);
     }
 
     protected abstract <TransformT extends PTransform<? super PInput, POutput>> void
         doVisitTransform(TransformTreeNode node);
-
-    @Override
-    public void visitValue(PValue value, TransformTreeNode producer) {
-    }
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dbf7a06a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
index 65a0755..4e7e63f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
@@ -220,8 +220,10 @@ public class Pipeline {
     /**
      * Called for each composite transform after all topological predecessors have been visited
      * but before any of its component transforms.
+     *
+     * <p>The return value controls whether or not child transforms are visited.
      */
-    public void enterCompositeTransform(TransformTreeNode node);
+    public CompositeBehavior enterCompositeTransform(TransformTreeNode node);
 
     /**
      * Called for each composite transform after all of its component transforms and their outputs
@@ -233,13 +235,42 @@ public class Pipeline {
      * Called for each primitive transform after all of its topological predecessors
      * and inputs have been visited.
      */
-    public void visitTransform(TransformTreeNode node);
+    public void visitPrimitiveTransform(TransformTreeNode node);
 
     /**
      * Called for each value after the transform that produced the value has been
      * visited.
      */
     public void visitValue(PValue value, TransformTreeNode producer);
+
+    /**
+     * Control enum for indicating whether or not a traversal should process the contents of
+     * a composite transform or not.
+     */
+    public enum CompositeBehavior {
+      ENTER_TRANSFORM,
+      DO_NOT_ENTER_TRANSFORM;
+    }
+
+    /**
+     * Default no-op {@link PipelineVisitor} that enters all composite transforms.
+     * User implementations can override just those methods they are interested in.
+     */
+    public class Defaults implements PipelineVisitor {
+      @Override
+      public CompositeBehavior enterCompositeTransform(TransformTreeNode node) {
+        return CompositeBehavior.ENTER_TRANSFORM;
+      }
+
+      @Override
+      public void leaveCompositeTransform(TransformTreeNode node) { }
+
+      @Override
+      public void visitPrimitiveTransform(TransformTreeNode node) { }
+
+      @Override
+      public void visitValue(PValue value, TransformTreeNode producer) { }
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dbf7a06a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/AggregatorPipelineExtractor.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/AggregatorPipelineExtractor.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/AggregatorPipelineExtractor.java
index 86a851f..146ddfa 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/AggregatorPipelineExtractor.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/AggregatorPipelineExtractor.java
@@ -56,7 +56,7 @@ public class AggregatorPipelineExtractor {
     return aggregatorSteps.asMap();
   }
 
-  private static class AggregatorVisitor implements PipelineVisitor {
+  private static class AggregatorVisitor extends PipelineVisitor.Defaults {
     private final SetMultimap<Aggregator<?, ?>, PTransform<?, ?>> aggregatorSteps;
 
     public AggregatorVisitor(SetMultimap<Aggregator<?, ?>, PTransform<?, ?>> aggregatorSteps) {
@@ -64,13 +64,7 @@ public class AggregatorPipelineExtractor {
     }
 
     @Override
-    public void enterCompositeTransform(TransformTreeNode node) {}
-
-    @Override
-    public void leaveCompositeTransform(TransformTreeNode node) {}
-
-    @Override
-    public void visitTransform(TransformTreeNode node) {
+    public void visitPrimitiveTransform(TransformTreeNode node) {
       PTransform<?, ?> transform = node.getTransform();
       addStepToAggregators(transform, getAggregators(transform));
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dbf7a06a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/DirectPipelineRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/DirectPipelineRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/DirectPipelineRunner.java
index 3cb9703..590ce6f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/DirectPipelineRunner.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/DirectPipelineRunner.java
@@ -828,7 +828,7 @@ public class DirectPipelineRunner
 
   /////////////////////////////////////////////////////////////////////////////
 
-  class Evaluator implements PipelineVisitor, EvaluationContext {
+  class Evaluator extends PipelineVisitor.Defaults implements EvaluationContext {
     /**
      * A map from PTransform to the step name of that transform. This is the internal name for the
      * transform (e.g. "s2").
@@ -881,15 +881,7 @@ public class DirectPipelineRunner
     }
 
     @Override
-    public void enterCompositeTransform(TransformTreeNode node) {
-    }
-
-    @Override
-    public void leaveCompositeTransform(TransformTreeNode node) {
-    }
-
-    @Override
-    public void visitTransform(TransformTreeNode node) {
+    public void visitPrimitiveTransform(TransformTreeNode node) {
       PTransform<?, ?> transform = node.getTransform();
       fullNames.put(transform, node.getFullName());
       TransformEvaluator evaluator =

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dbf7a06a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/RecordingPipelineVisitor.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/RecordingPipelineVisitor.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/RecordingPipelineVisitor.java
index 84df5fd..d64738f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/RecordingPipelineVisitor.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/RecordingPipelineVisitor.java
@@ -30,21 +30,13 @@ import java.util.List;
  *
  * <p>Provided for internal unit tests.
  */
-public class RecordingPipelineVisitor implements Pipeline.PipelineVisitor {
+public class RecordingPipelineVisitor extends Pipeline.PipelineVisitor.Defaults {
 
   public final List<PTransform<?, ?>> transforms = new ArrayList<>();
   public final List<PValue> values = new ArrayList<>();
 
   @Override
-  public void enterCompositeTransform(TransformTreeNode node) {
-  }
-
-  @Override
-  public void leaveCompositeTransform(TransformTreeNode node) {
-  }
-
-  @Override
-  public void visitTransform(TransformTreeNode node) {
+  public void visitPrimitiveTransform(TransformTreeNode node) {
     transforms.add(node.getTransform());
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dbf7a06a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformTreeNode.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformTreeNode.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformTreeNode.java
index a6efc51..59edd52 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformTreeNode.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformTreeNode.java
@@ -17,7 +17,8 @@
  */
 package org.apache.beam.sdk.runners;
 
-import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.Pipeline.PipelineVisitor;
+import org.apache.beam.sdk.Pipeline.PipelineVisitor.CompositeBehavior;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
@@ -198,7 +199,7 @@ public class TransformTreeNode {
    * transform (or child nodes for composite transforms), then the
    * output values.
    */
-  public void visit(Pipeline.PipelineVisitor visitor,
+  public void visit(PipelineVisitor visitor,
                     Set<PValue> visitedValues) {
     if (!finishedSpecifying) {
       finishSpecifying();
@@ -212,13 +213,16 @@ public class TransformTreeNode {
     }
 
     if (isCompositeNode()) {
-      visitor.enterCompositeTransform(this);
-      for (TransformTreeNode child : parts) {
-        child.visit(visitor, visitedValues);
+      PipelineVisitor.CompositeBehavior recurse = visitor.enterCompositeTransform(this);
+
+      if (recurse.equals(CompositeBehavior.ENTER_TRANSFORM)) {
+        for (TransformTreeNode child : parts) {
+          child.visit(visitor, visitedValues);
+        }
       }
       visitor.leaveCompositeTransform(this);
     } else {
-      visitor.visitTransform(this);
+      visitor.visitPrimitiveTransform(this);
     }
 
     // Visit outputs.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dbf7a06a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/AggregatorPipelineExtractorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/AggregatorPipelineExtractorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/AggregatorPipelineExtractorTest.java
index 7950a9e..74cc5e0 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/AggregatorPipelineExtractorTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/AggregatorPipelineExtractorTest.java
@@ -205,7 +205,7 @@ public class AggregatorPipelineExtractorTest {
     public Object answer(InvocationOnMock invocation) throws Throwable {
       PipelineVisitor visitor = (PipelineVisitor) invocation.getArguments()[0];
       for (TransformTreeNode node : nodes) {
-        visitor.visitTransform(node);
+        visitor.visitPrimitiveTransform(node);
       }
       return null;
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dbf7a06a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
index e4eb204..aecebd7 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
@@ -40,7 +40,6 @@ import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
 import org.apache.beam.sdk.values.PDone;
-import org.apache.beam.sdk.values.PValue;
 
 import org.junit.Rule;
 import org.junit.Test;
@@ -128,9 +127,9 @@ public class TransformTreeTest {
     final EnumSet<TransformsSeen> left =
         EnumSet.noneOf(TransformsSeen.class);
 
-    p.traverseTopologically(new Pipeline.PipelineVisitor() {
+    p.traverseTopologically(new Pipeline.PipelineVisitor.Defaults() {
       @Override
-      public void enterCompositeTransform(TransformTreeNode node) {
+      public CompositeBehavior enterCompositeTransform(TransformTreeNode node) {
         PTransform<?, ?> transform = node.getTransform();
         if (transform instanceof Sample.SampleAny) {
           assertTrue(visited.add(TransformsSeen.SAMPLE_ANY));
@@ -142,6 +141,7 @@ public class TransformTreeTest {
           assertTrue(node.isCompositeNode());
         }
         assertThat(transform, not(instanceOf(Read.Bounded.class)));
+        return CompositeBehavior.ENTER_TRANSFORM;
       }
 
       @Override
@@ -153,7 +153,7 @@ public class TransformTreeTest {
       }
 
       @Override
-      public void visitTransform(TransformTreeNode node) {
+      public void visitPrimitiveTransform(TransformTreeNode node) {
         PTransform<?, ?> transform = node.getTransform();
         // Pick is a composite, should not be visited here.
         assertThat(transform, not(instanceOf(Sample.SampleAny.class)));
@@ -163,10 +163,6 @@ public class TransformTreeTest {
           assertTrue(visited.add(TransformsSeen.READ));
         }
       }
-
-      @Override
-      public void visitValue(PValue value, TransformTreeNode producer) {
-      }
     });
 
     assertTrue(visited.equals(EnumSet.allOf(TransformsSeen.class)));



[2/2] incubator-beam git commit: This closes #217

Posted by ke...@apache.org.
This closes #217


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/03e99540
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/03e99540
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/03e99540

Branch: refs/heads/master
Commit: 03e99540abfa545c7aece1f89c04578c8e59950e
Parents: 07c60a9 dbf7a06
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon May 9 07:52:15 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon May 9 07:52:15 2016 -0700

----------------------------------------------------------------------
 .../direct/ConsumerTrackingPipelineVisitor.java |  7 +-
 .../direct/KeyedPValueTrackingVisitor.java      |  5 +-
 .../FlinkBatchPipelineTranslator.java           | 83 +++++++++-----------
 .../translation/FlinkPipelineTranslator.java    |  2 +-
 .../FlinkStreamingPipelineTranslator.java       | 35 ++-------
 .../dataflow/DataflowPipelineRunner.java        |  5 +-
 .../dataflow/DataflowPipelineTranslator.java    |  7 +-
 .../dataflow/DataflowPipelineRunnerTest.java    | 18 +----
 .../beam/runners/spark/SparkPipelineRunner.java | 48 ++---------
 .../main/java/org/apache/beam/sdk/Pipeline.java | 35 ++++++++-
 .../runners/AggregatorPipelineExtractor.java    | 10 +--
 .../beam/sdk/runners/DirectPipelineRunner.java  | 12 +--
 .../sdk/runners/RecordingPipelineVisitor.java   | 12 +--
 .../beam/sdk/runners/TransformTreeNode.java     | 16 ++--
 .../AggregatorPipelineExtractorTest.java        |  2 +-
 .../beam/sdk/runners/TransformTreeTest.java     | 12 +--
 16 files changed, 124 insertions(+), 185 deletions(-)
----------------------------------------------------------------------