You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2018/11/27 15:39:17 UTC

[beam] 07/20: Wire node translators with pipeline translator

This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit b77c7bbee35b9cf48e2a2994ad7330e7e2fa9ae6
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Wed Nov 21 10:43:19 2018 +0100

    Wire node translators with pipeline translator
---
 .../translation/PipelineTranslator.java            | 15 ++++-
 .../translation/batch/BatchPipelineTranslator.java | 66 ++++++++++++++++++++--
 .../translation/batch/BatchTranslationContext.java |  3 +
 3 files changed, 77 insertions(+), 7 deletions(-)

diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java
index 99621f6..db5c354 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java
@@ -21,7 +21,6 @@ import org.slf4j.LoggerFactory;
 
 public class PipelineTranslator extends Pipeline.PipelineVisitor.Defaults{
 
-
   /**
    * Local configurations work in the same JVM and have no problems with improperly formatted files
    * on classpath (eg. directories with .class files or empty directories). Prepare files for
@@ -51,6 +50,20 @@ public class PipelineTranslator extends Pipeline.PipelineVisitor.Defaults{
   }
 
   /**
+   * Utility formatting method.
+   *
+   * @param n number of spaces to generate
+   * @return String with "|" followed by n spaces
+   */
+  protected static String genSpaces(int n) {
+    StringBuilder builder = new StringBuilder();
+    for (int i = 0; i < n; i++) {
+      builder.append("|   ");
+    }
+    return builder.toString();
+  }
+
+  /**
    * Translates the pipeline by passing this class as a visitor.
    *
    * @param pipeline The pipeline to be translated
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchPipelineTranslator.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchPipelineTranslator.java
index 2f7ac23..e20e4c0 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchPipelineTranslator.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchPipelineTranslator.java
@@ -8,6 +8,8 @@ import org.apache.beam.runners.spark.structuredstreaming.translation.PipelineTra
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /** {@link Pipeline.PipelineVisitor} for executing a {@link Pipeline} as a Spark batch job. */
 
@@ -18,6 +20,9 @@ public class BatchPipelineTranslator extends PipelineTranslator {
   //  Transform Translator Registry
   // --------------------------------------------------------------------------------------------
 
+  private BatchTranslationContext translationContext;
+  private int depth = 0;
+
   @SuppressWarnings("rawtypes")
   private static final Map<String, BatchTransformTranslator>
       TRANSLATORS = new HashMap<>();
@@ -39,6 +44,9 @@ public class BatchPipelineTranslator extends PipelineTranslator {
 
     TRANSLATORS.put(PTransformTranslation.READ_TRANSFORM_URN, new ReadSourceTranslatorBatch());
   }
+  private static final Logger LOG = LoggerFactory.getLogger(BatchPipelineTranslator.class);
+
+
 
   /** Returns a translator for the given node, if it is possible, otherwise null. */
   private static BatchTransformTranslator<?> getTranslator(TransformHierarchy.Node node) {
@@ -52,15 +60,61 @@ public class BatchPipelineTranslator extends PipelineTranslator {
   }
 
 
-  @Override public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
-    return super.enterCompositeTransform(node);
-    //TODO impl
+  // --------------------------------------------------------------------------------------------
+  //  Pipeline Visitor Methods
+  // --------------------------------------------------------------------------------------------
+
+  @Override
+  public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
+    LOG.info("{} enterCompositeTransform- {}", genSpaces(depth), node.getFullName());
+    depth++;
+
+    BatchTransformTranslator<?> translator = getTranslator(node);
+
+    if (translator != null) {
+      translateNode(node, translator);
+      LOG.info("{} translated- {}", genSpaces(depth), node.getFullName());
+      return CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
+    } else {
+      return CompositeBehavior.ENTER_TRANSFORM;
+    }
   }
 
+  @Override
+  public void leaveCompositeTransform(TransformHierarchy.Node node) {
+    depth--;
+    LOG.info("{} leaveCompositeTransform- {}", genSpaces(depth), node.getFullName());
+  }
 
-  @Override public void visitPrimitiveTransform(TransformHierarchy.Node node) {
-    super.visitPrimitiveTransform(node);
-    //TODO impl
+  @Override
+  public void visitPrimitiveTransform(TransformHierarchy.Node node) {
+    LOG.info("{} visitPrimitiveTransform- {}", genSpaces(depth), node.getFullName());
+
+    // get the transformation corresponding to the node we are
+    // currently visiting and translate it into its Spark alternative.
+    BatchTransformTranslator<?> translator = getTranslator(node);
+    if (translator == null) {
+      String transformUrn = PTransformTranslation.urnForTransform(node.getTransform());
+      throw new UnsupportedOperationException(
+          "The transform " + transformUrn + " is currently not supported.");
+    }
+    translateNode(node, translator);
   }
 
+  private <T extends PTransform<?, ?>> void translateNode(
+      TransformHierarchy.Node node,
+      BatchTransformTranslator<?> translator) {
+
+    @SuppressWarnings("unchecked")
+    T typedTransform = (T) node.getTransform();
+
+    @SuppressWarnings("unchecked")
+    BatchTransformTranslator<T> typedTranslator = (BatchTransformTranslator<T>) translator;
+
+    // create the applied PTransform on the translationContext
+    translationContext.setCurrentTransform(node.toAppliedPTransform(getPipeline()));
+    typedTranslator.translateNode(typedTransform, translationContext);
   }
+
+
+}
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchTranslationContext.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchTranslationContext.java
index 554beea..1d991f1 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchTranslationContext.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchTranslationContext.java
@@ -33,4 +33,7 @@ public class BatchTranslationContext {
     this.danglingDataSets = new HashMap<>();
   }
 
+  public void setCurrentTransform(AppliedPTransform<?, ?, ?> currentTransform) {
+    this.currentTransform = currentTransform;
+  }
 }