You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2018/11/27 15:39:18 UTC

[beam] 08/20: Renames: better differenciate pipeline translator for transform translator

This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit b6b426e99f8f4ce322a6327c93af8918318a1563
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Wed Nov 21 10:52:43 2018 +0100

    Renames: better differenciate pipeline translator for transform translator
---
 .../spark/structuredstreaming/SparkRunner.java     |  6 ++--
 .../translation/batch/BatchPipelineTranslator.java | 39 +++++++++++-----------
 2 files changed, 22 insertions(+), 23 deletions(-)

diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java
index 59c08f7..3e3b112 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java
@@ -98,9 +98,9 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
     PipelineTranslator.detectTranslationMode(pipeline, options);
     PipelineTranslator.replaceTransforms(pipeline, options);
     PipelineTranslator.prepareFilesToStageForRemoteClusterExecution(options);
-    PipelineTranslator translator = options.isStreaming() ? new StreamingPipelineTranslator() : new BatchPipelineTranslator();
-    //init translator with subclass based on mode and env
-    translator.translate(pipeline);
+    PipelineTranslator pipelineTranslator = options.isStreaming() ? new StreamingPipelineTranslator() : new BatchPipelineTranslator();
+    //init pipelineTranslator with subclass based on mode and env
+    pipelineTranslator.translate(pipeline);
   }
   private void executePipeline(Pipeline pipeline) {}
 
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchPipelineTranslator.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchPipelineTranslator.java
index e20e4c0..2459372 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchPipelineTranslator.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchPipelineTranslator.java
@@ -24,39 +24,38 @@ public class BatchPipelineTranslator extends PipelineTranslator {
   private int depth = 0;
 
   @SuppressWarnings("rawtypes")
-  private static final Map<String, BatchTransformTranslator>
-      TRANSLATORS = new HashMap<>();
+  private static final Map<String, BatchTransformTranslator> TRANSFORM_TRANSLATORS = new HashMap<>();
 
   static {
-    TRANSLATORS.put(PTransformTranslation.COMBINE_PER_KEY_TRANSFORM_URN,
+    TRANSFORM_TRANSLATORS.put(PTransformTranslation.COMBINE_PER_KEY_TRANSFORM_URN,
         new CombinePerKeyTranslatorBatch());
-    TRANSLATORS
+    TRANSFORM_TRANSLATORS
         .put(PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN, new GroupByKeyTranslatorBatch());
-    TRANSLATORS.put(PTransformTranslation.RESHUFFLE_URN, new ReshuffleTranslatorBatch());
+    TRANSFORM_TRANSLATORS.put(PTransformTranslation.RESHUFFLE_URN, new ReshuffleTranslatorBatch());
 
-    TRANSLATORS
+    TRANSFORM_TRANSLATORS
         .put(PTransformTranslation.FLATTEN_TRANSFORM_URN, new FlattenPCollectionTranslatorBatch());
 
-    TRANSLATORS
+    TRANSFORM_TRANSLATORS
         .put(PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN, new WindowAssignTranslatorBatch());
 
-    TRANSLATORS.put(PTransformTranslation.PAR_DO_TRANSFORM_URN, new ParDoTranslatorBatch());
+    TRANSFORM_TRANSLATORS.put(PTransformTranslation.PAR_DO_TRANSFORM_URN, new ParDoTranslatorBatch());
 
-    TRANSLATORS.put(PTransformTranslation.READ_TRANSFORM_URN, new ReadSourceTranslatorBatch());
+    TRANSFORM_TRANSLATORS.put(PTransformTranslation.READ_TRANSFORM_URN, new ReadSourceTranslatorBatch());
   }
   private static final Logger LOG = LoggerFactory.getLogger(BatchPipelineTranslator.class);
 
 
 
   /** Returns a translator for the given node, if it is possible, otherwise null. */
-  private static BatchTransformTranslator<?> getTranslator(TransformHierarchy.Node node) {
+  private static BatchTransformTranslator<?> getTransformTranslator(TransformHierarchy.Node node) {
     @Nullable PTransform<?, ?> transform = node.getTransform();
     // Root of the graph is null
     if (transform == null) {
       return null;
     }
     @Nullable String urn = PTransformTranslation.urnForTransformOrNull(transform);
-    return (urn == null) ? null : TRANSLATORS.get(urn);
+    return (urn == null) ? null : TRANSFORM_TRANSLATORS.get(urn);
   }
 
 
@@ -69,10 +68,10 @@ public class BatchPipelineTranslator extends PipelineTranslator {
     LOG.info("{} enterCompositeTransform- {}", genSpaces(depth), node.getFullName());
     depth++;
 
-    BatchTransformTranslator<?> translator = getTranslator(node);
+    BatchTransformTranslator<?> transformTranslator = getTransformTranslator(node);
 
-    if (translator != null) {
-      translateNode(node, translator);
+    if (transformTranslator != null) {
+      translateNode(node, transformTranslator);
       LOG.info("{} translated- {}", genSpaces(depth), node.getFullName());
       return CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
     } else {
@@ -92,28 +91,28 @@ public class BatchPipelineTranslator extends PipelineTranslator {
 
     // get the transformation corresponding to the node we are
     // currently visiting and translate it into its Spark alternative.
-    BatchTransformTranslator<?> translator = getTranslator(node);
-    if (translator == null) {
+    BatchTransformTranslator<?> transformTranslator = getTransformTranslator(node);
+    if (transformTranslator == null) {
       String transformUrn = PTransformTranslation.urnForTransform(node.getTransform());
       throw new UnsupportedOperationException(
           "The transform " + transformUrn + " is currently not supported.");
     }
-    translateNode(node, translator);
+    translateNode(node, transformTranslator);
   }
 
   private <T extends PTransform<?, ?>> void translateNode(
       TransformHierarchy.Node node,
-      BatchTransformTranslator<?> translator) {
+      BatchTransformTranslator<?> transformTranslator) {
 
     @SuppressWarnings("unchecked")
     T typedTransform = (T) node.getTransform();
 
     @SuppressWarnings("unchecked")
-    BatchTransformTranslator<T> typedTranslator = (BatchTransformTranslator<T>) translator;
+    BatchTransformTranslator<T> typedTransformTranslator = (BatchTransformTranslator<T>) transformTranslator;
 
     // create the applied PTransform on the translationContext
     translationContext.setCurrentTransform(node.toAppliedPTransform(getPipeline()));
-    typedTranslator.translateNode(typedTransform, translationContext);
+    typedTransformTranslator.translateNode(typedTransform, translationContext);
   }