You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/01/04 10:38:34 UTC

[beam] 12/50: Make transform translation clearer: renaming, comments

This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit ce484e9efeffeba3678958521dc61958a18449fd
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Thu Nov 22 11:06:15 2018 +0100

    Make transform translation clearer: renaming, comments
---
 .../translation/PipelineTranslator.java                | 18 +++++++++++-------
 .../translation/TransformTranslator.java               |  2 +-
 .../batch/BatchCombinePerKeyTranslator.java            |  2 +-
 .../batch/BatchFlattenPCollectionTranslator.java       |  2 +-
 .../translation/batch/BatchGroupByKeyTranslator.java   |  2 +-
 .../translation/batch/BatchParDoTranslator.java        |  2 +-
 .../translation/batch/BatchReadSourceTranslator.java   |  2 +-
 .../translation/batch/BatchReshuffleTranslator.java    |  2 +-
 .../translation/batch/BatchWindowAssignTranslator.java |  2 +-
 9 files changed, 19 insertions(+), 15 deletions(-)

diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java
index 62e87f2..185879b 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java
@@ -123,19 +123,23 @@ public abstract class PipelineTranslator extends Pipeline.PipelineVisitor.Defaul
    */
   protected abstract TransformTranslator<?> getTransformTranslator(TransformHierarchy.Node node);
 
-  private <T extends PTransform<?, ?>> void translateNode(
+  /**
+   * Apply the given TransformTranslator to the given node.
+   */
+  private <T extends PTransform<?, ?>> void applyTransformTranslator(
       TransformHierarchy.Node node,
       TransformTranslator<?> transformTranslator) {
+    // create the applied PTransform on the translationContext
+    translationContext.setCurrentTransform(node.toAppliedPTransform(getPipeline()));
 
+    // avoid type capture
     @SuppressWarnings("unchecked")
     T typedTransform = (T) node.getTransform();
-
     @SuppressWarnings("unchecked")
     TransformTranslator<T> typedTransformTranslator = (TransformTranslator<T>) transformTranslator;
 
-    // create the applied PTransform on the translationContext
-    translationContext.setCurrentTransform(node.toAppliedPTransform(getPipeline()));
-    typedTransformTranslator.translateNode(typedTransform, translationContext);
+    // apply the transformTranslator
+    typedTransformTranslator.translateTransform(typedTransform, translationContext);
   }
 
 
@@ -165,7 +169,7 @@ public abstract class PipelineTranslator extends Pipeline.PipelineVisitor.Defaul
     TransformTranslator<?> transformTranslator = getTransformTranslator(node);
 
     if (transformTranslator != null) {
-      translateNode(node, transformTranslator);
+      applyTransformTranslator(node, transformTranslator);
       LOG.info("{} translated- {}", genSpaces(depth), node.getFullName());
       return CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
     } else {
@@ -191,6 +195,6 @@ public abstract class PipelineTranslator extends Pipeline.PipelineVisitor.Defaul
       throw new UnsupportedOperationException(
           "The transform " + transformUrn + " is currently not supported.");
     }
-    translateNode(node, transformTranslator);
+    applyTransformTranslator(node, transformTranslator);
   }
 }
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TransformTranslator.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TransformTranslator.java
index 51cdd99..ebb8bf8 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TransformTranslator.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TransformTranslator.java
@@ -6,6 +6,6 @@ public interface TransformTranslator<TransformT extends PTransform> {
 
   /** A translator of a {@link PTransform}. */
 
-  void translateNode(TransformT transform, TranslationContext context);
+  void translateTransform(TransformT transform, TranslationContext context);
   }
 
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchCombinePerKeyTranslator.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchCombinePerKeyTranslator.java
index c9cae47..858df18 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchCombinePerKeyTranslator.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchCombinePerKeyTranslator.java
@@ -9,7 +9,7 @@ import org.apache.beam.sdk.values.PCollection;
 class BatchCombinePerKeyTranslator<K, InputT, AccumT, OutputT> implements
     TransformTranslator<PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>>> {
 
-  @Override public void translateNode(
+  @Override public void translateTransform(
       PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>> transform,
       TranslationContext context) {
 
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchFlattenPCollectionTranslator.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchFlattenPCollectionTranslator.java
index 77f6fdb..90c487a 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchFlattenPCollectionTranslator.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchFlattenPCollectionTranslator.java
@@ -9,7 +9,7 @@ import org.apache.beam.sdk.values.PCollectionList;
 class BatchFlattenPCollectionTranslator<T> implements
     TransformTranslator<PTransform<PCollectionList<T>, PCollection<T>>> {
 
-  @Override public void translateNode(PTransform<PCollectionList<T>, PCollection<T>> transform,
+  @Override public void translateTransform(PTransform<PCollectionList<T>, PCollection<T>> transform,
       TranslationContext context) {
 
   }
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchGroupByKeyTranslator.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchGroupByKeyTranslator.java
index 1bd42f5..52a3c39 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchGroupByKeyTranslator.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchGroupByKeyTranslator.java
@@ -9,7 +9,7 @@ import org.apache.beam.sdk.values.PCollection;
 class BatchGroupByKeyTranslator<K, InputT> implements
     TransformTranslator<PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, Iterable<InputT>>>>> {
 
-  @Override public void translateNode(
+  @Override public void translateTransform(
       PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, Iterable<InputT>>>> transform,
       TranslationContext context) {
 
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchParDoTranslator.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchParDoTranslator.java
index cf8c896..6e7f342 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchParDoTranslator.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchParDoTranslator.java
@@ -9,7 +9,7 @@ import org.apache.beam.sdk.values.PCollectionTuple;
 class BatchParDoTranslator<InputT, OutputT> implements
     TransformTranslator<PTransform<PCollection<InputT>, PCollectionTuple>> {
 
-  @Override public void translateNode(PTransform<PCollection<InputT>, PCollectionTuple> transform,
+  @Override public void translateTransform(PTransform<PCollection<InputT>, PCollectionTuple> transform,
       TranslationContext context) {
 
   }
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchReadSourceTranslator.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchReadSourceTranslator.java
index f5f0351..4236b1c 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchReadSourceTranslator.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchReadSourceTranslator.java
@@ -8,7 +8,7 @@ import org.apache.beam.sdk.values.PCollection;
 
 class BatchReadSourceTranslator<T> implements TransformTranslator<PTransform<PBegin, PCollection<T>>> {
 
-  @Override public void translateNode(PTransform<PBegin, PCollection<T>> transform,
+  @Override public void translateTransform(PTransform<PBegin, PCollection<T>> transform,
       TranslationContext context) {
 
   }
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchReshuffleTranslator.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchReshuffleTranslator.java
index 5fab1c8..5baa331 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchReshuffleTranslator.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchReshuffleTranslator.java
@@ -6,7 +6,7 @@ import org.apache.beam.sdk.transforms.Reshuffle;
 
 class BatchReshuffleTranslator<K, InputT> implements TransformTranslator<Reshuffle<K, InputT>> {
 
-  @Override public void translateNode(Reshuffle<K, InputT> transform, TranslationContext context) {
+  @Override public void translateTransform(Reshuffle<K, InputT> transform, TranslationContext context) {
 
   }
 }
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchWindowAssignTranslator.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchWindowAssignTranslator.java
index fbbced5..1a8f68b 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchWindowAssignTranslator.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchWindowAssignTranslator.java
@@ -8,7 +8,7 @@ import org.apache.beam.sdk.values.PCollection;
 class BatchWindowAssignTranslator<T> implements
     TransformTranslator<PTransform<PCollection<T>, PCollection<T>>> {
 
-  @Override public void translateNode(PTransform<PCollection<T>, PCollection<T>> transform,
+  @Override public void translateTransform(PTransform<PCollection<T>, PCollection<T>> transform,
       TranslationContext context) {
   }
 }