You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/01/04 10:38:31 UTC
[beam] 09/50: Organise methods in PipelineTranslator
This is an automated email from the ASF dual-hosted git repository.
echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git
commit 6695d6462020afd46857c7b50f981d4187c4a802
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Wed Nov 21 11:31:43 2018 +0100
Organise methods in PipelineTranslator
---
.../spark/structuredstreaming/SparkRunner.java | 1 -
.../translation/PipelineTranslator.java | 64 +++++++++++++---------
2 files changed, 38 insertions(+), 27 deletions(-)
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java
index 3e3b112..ab2215b 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java
@@ -99,7 +99,6 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
PipelineTranslator.replaceTransforms(pipeline, options);
PipelineTranslator.prepareFilesToStageForRemoteClusterExecution(options);
PipelineTranslator pipelineTranslator = options.isStreaming() ? new StreamingPipelineTranslator() : new BatchPipelineTranslator();
- //init pipelineTranslator with subclass based on mode and env
pipelineTranslator.translate(pipeline);
}
private void executePipeline(Pipeline pipeline) {}
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java
index db5c354..8eb1fb6 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java
@@ -21,6 +21,9 @@ import org.slf4j.LoggerFactory;
public class PipelineTranslator extends Pipeline.PipelineVisitor.Defaults{
+ // --------------------------------------------------------------------------------------------
+ // Pipeline preparation methods
+ // --------------------------------------------------------------------------------------------
/**
* Local configurations work in the same JVM and have no problems with improperly formatted files
* on classpath (eg. directories with .class files or empty directories). Prepare files for
@@ -49,32 +52,6 @@ public class PipelineTranslator extends Pipeline.PipelineVisitor.Defaults{
}
}
- /**
- * Utility formatting method.
- *
- * @param n number of spaces to generate
- * @return String with "|" followed by n spaces
- */
- protected static String genSpaces(int n) {
- StringBuilder builder = new StringBuilder();
- for (int i = 0; i < n; i++) {
- builder.append("| ");
- }
- return builder.toString();
- }
-
- /**
- * Translates the pipeline by passing this class as a visitor.
- *
- * @param pipeline The pipeline to be translated
- */
- public void translate(Pipeline pipeline) {
- pipeline.traverseTopologically(this);
- }
-
-
-
-
/** The translation mode of the Beam Pipeline. */
private enum TranslationMode {
@@ -116,4 +93,39 @@ public class PipelineTranslator extends Pipeline.PipelineVisitor.Defaults{
}
}
+ // --------------------------------------------------------------------------------------------
+ // Pipeline utility methods
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Utility formatting method.
+ *
+ * @param n number of spaces to generate
+ * @return String with "|" followed by n spaces
+ */
+ protected static String genSpaces(int n) {
+ StringBuilder builder = new StringBuilder();
+ for (int i = 0; i < n; i++) {
+ builder.append("| ");
+ }
+ return builder.toString();
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Pipeline visitor methods
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Translates the pipeline by passing this class as a visitor.
+ *
+ * @param pipeline The pipeline to be translated
+ */
+ public void translate(Pipeline pipeline) {
+ pipeline.traverseTopologically(this);
+ }
+
+
+
+
+
}