You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/01/04 10:38:31 UTC

[beam] 09/50: Organise methods in PipelineTranslator

This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 6695d6462020afd46857c7b50f981d4187c4a802
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Wed Nov 21 11:31:43 2018 +0100

    Organise methods in PipelineTranslator
---
 .../spark/structuredstreaming/SparkRunner.java     |  1 -
 .../translation/PipelineTranslator.java            | 64 +++++++++++++---------
 2 files changed, 38 insertions(+), 27 deletions(-)

diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java
index 3e3b112..ab2215b 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java
@@ -99,7 +99,6 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
     PipelineTranslator.replaceTransforms(pipeline, options);
     PipelineTranslator.prepareFilesToStageForRemoteClusterExecution(options);
     PipelineTranslator pipelineTranslator = options.isStreaming() ? new StreamingPipelineTranslator() : new BatchPipelineTranslator();
-    //init pipelineTranslator with subclass based on mode and env
     pipelineTranslator.translate(pipeline);
   }
   private void executePipeline(Pipeline pipeline) {}
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java
index db5c354..8eb1fb6 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java
@@ -21,6 +21,9 @@ import org.slf4j.LoggerFactory;
 
 public class PipelineTranslator extends Pipeline.PipelineVisitor.Defaults{
 
+  // --------------------------------------------------------------------------------------------
+  //  Pipeline preparation methods
+  // --------------------------------------------------------------------------------------------
   /**
    * Local configurations work in the same JVM and have no problems with improperly formatted files
    * on classpath (eg. directories with .class files or empty directories). Prepare files for
@@ -49,32 +52,6 @@ public class PipelineTranslator extends Pipeline.PipelineVisitor.Defaults{
     }
   }
 
-  /**
-   * Utility formatting method.
-   *
-   * @param n number of spaces to generate
-   * @return String with "|" followed by n spaces
-   */
-  protected static String genSpaces(int n) {
-    StringBuilder builder = new StringBuilder();
-    for (int i = 0; i < n; i++) {
-      builder.append("|   ");
-    }
-    return builder.toString();
-  }
-
-  /**
-   * Translates the pipeline by passing this class as a visitor.
-   *
-   * @param pipeline The pipeline to be translated
-   */
-  public void translate(Pipeline pipeline) {
-    pipeline.traverseTopologically(this);
-  }
-
-
-
-
   /** The translation mode of the Beam Pipeline. */
   private enum TranslationMode {
 
@@ -116,4 +93,39 @@ public class PipelineTranslator extends Pipeline.PipelineVisitor.Defaults{
     }
   }
 
+  // --------------------------------------------------------------------------------------------
+  //  Pipeline utility methods
+  // --------------------------------------------------------------------------------------------
+
+  /**
+   * Utility formatting method.
+   *
+   * @param n number of spaces to generate
+   * @return String with "|" followed by n spaces
+   */
+  protected static String genSpaces(int n) {
+    StringBuilder builder = new StringBuilder();
+    for (int i = 0; i < n; i++) {
+      builder.append("|   ");
+    }
+    return builder.toString();
+  }
+
+  // --------------------------------------------------------------------------------------------
+  //  Pipeline visitor methods
+  // --------------------------------------------------------------------------------------------
+
+  /**
+   * Translates the pipeline by passing this class as a visitor.
+   *
+   * @param pipeline The pipeline to be translated
+   */
+  public void translate(Pipeline pipeline) {
+    pipeline.traverseTopologically(this);
+  }
+
+
+
+
+
 }