You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/01/04 10:38:34 UTC
[beam] 12/50: Make transform translation clearer: renaming, comments
This is an automated email from the ASF dual-hosted git repository.
echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git
commit ce484e9efeffeba3678958521dc61958a18449fd
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Thu Nov 22 11:06:15 2018 +0100
Make transform translation clearer: renaming, comments
---
.../translation/PipelineTranslator.java | 18 +++++++++++-------
.../translation/TransformTranslator.java | 2 +-
.../batch/BatchCombinePerKeyTranslator.java | 2 +-
.../batch/BatchFlattenPCollectionTranslator.java | 2 +-
.../translation/batch/BatchGroupByKeyTranslator.java | 2 +-
.../translation/batch/BatchParDoTranslator.java | 2 +-
.../translation/batch/BatchReadSourceTranslator.java | 2 +-
.../translation/batch/BatchReshuffleTranslator.java | 2 +-
.../translation/batch/BatchWindowAssignTranslator.java | 2 +-
9 files changed, 19 insertions(+), 15 deletions(-)
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java
index 62e87f2..185879b 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java
@@ -123,19 +123,23 @@ public abstract class PipelineTranslator extends Pipeline.PipelineVisitor.Defaul
*/
protected abstract TransformTranslator<?> getTransformTranslator(TransformHierarchy.Node node);
- private <T extends PTransform<?, ?>> void translateNode(
+ /**
+ * Apply the given TransformTranslator to the given node.
+ */
+ private <T extends PTransform<?, ?>> void applyTransformTranslator(
TransformHierarchy.Node node,
TransformTranslator<?> transformTranslator) {
+ // create the applied PTransform on the translationContext
+ translationContext.setCurrentTransform(node.toAppliedPTransform(getPipeline()));
+ // avoid type capture
@SuppressWarnings("unchecked")
T typedTransform = (T) node.getTransform();
-
@SuppressWarnings("unchecked")
TransformTranslator<T> typedTransformTranslator = (TransformTranslator<T>) transformTranslator;
- // create the applied PTransform on the translationContext
- translationContext.setCurrentTransform(node.toAppliedPTransform(getPipeline()));
- typedTransformTranslator.translateNode(typedTransform, translationContext);
+ // apply the transformTranslator
+ typedTransformTranslator.translateTransform(typedTransform, translationContext);
}
@@ -165,7 +169,7 @@ public abstract class PipelineTranslator extends Pipeline.PipelineVisitor.Defaul
TransformTranslator<?> transformTranslator = getTransformTranslator(node);
if (transformTranslator != null) {
- translateNode(node, transformTranslator);
+ applyTransformTranslator(node, transformTranslator);
LOG.info("{} translated- {}", genSpaces(depth), node.getFullName());
return CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
} else {
@@ -191,6 +195,6 @@ public abstract class PipelineTranslator extends Pipeline.PipelineVisitor.Defaul
throw new UnsupportedOperationException(
"The transform " + transformUrn + " is currently not supported.");
}
- translateNode(node, transformTranslator);
+ applyTransformTranslator(node, transformTranslator);
}
}
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TransformTranslator.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TransformTranslator.java
index 51cdd99..ebb8bf8 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TransformTranslator.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TransformTranslator.java
@@ -6,6 +6,6 @@ public interface TransformTranslator<TransformT extends PTransform> {
/** A translator of a {@link PTransform}. */
- void translateNode(TransformT transform, TranslationContext context);
+ void translateTransform(TransformT transform, TranslationContext context);
}
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchCombinePerKeyTranslator.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchCombinePerKeyTranslator.java
index c9cae47..858df18 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchCombinePerKeyTranslator.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchCombinePerKeyTranslator.java
@@ -9,7 +9,7 @@ import org.apache.beam.sdk.values.PCollection;
class BatchCombinePerKeyTranslator<K, InputT, AccumT, OutputT> implements
TransformTranslator<PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>>> {
- @Override public void translateNode(
+ @Override public void translateTransform(
PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>> transform,
TranslationContext context) {
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchFlattenPCollectionTranslator.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchFlattenPCollectionTranslator.java
index 77f6fdb..90c487a 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchFlattenPCollectionTranslator.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchFlattenPCollectionTranslator.java
@@ -9,7 +9,7 @@ import org.apache.beam.sdk.values.PCollectionList;
class BatchFlattenPCollectionTranslator<T> implements
TransformTranslator<PTransform<PCollectionList<T>, PCollection<T>>> {
- @Override public void translateNode(PTransform<PCollectionList<T>, PCollection<T>> transform,
+ @Override public void translateTransform(PTransform<PCollectionList<T>, PCollection<T>> transform,
TranslationContext context) {
}
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchGroupByKeyTranslator.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchGroupByKeyTranslator.java
index 1bd42f5..52a3c39 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchGroupByKeyTranslator.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchGroupByKeyTranslator.java
@@ -9,7 +9,7 @@ import org.apache.beam.sdk.values.PCollection;
class BatchGroupByKeyTranslator<K, InputT> implements
TransformTranslator<PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, Iterable<InputT>>>>> {
- @Override public void translateNode(
+ @Override public void translateTransform(
PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, Iterable<InputT>>>> transform,
TranslationContext context) {
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchParDoTranslator.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchParDoTranslator.java
index cf8c896..6e7f342 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchParDoTranslator.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchParDoTranslator.java
@@ -9,7 +9,7 @@ import org.apache.beam.sdk.values.PCollectionTuple;
class BatchParDoTranslator<InputT, OutputT> implements
TransformTranslator<PTransform<PCollection<InputT>, PCollectionTuple>> {
- @Override public void translateNode(PTransform<PCollection<InputT>, PCollectionTuple> transform,
+ @Override public void translateTransform(PTransform<PCollection<InputT>, PCollectionTuple> transform,
TranslationContext context) {
}
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchReadSourceTranslator.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchReadSourceTranslator.java
index f5f0351..4236b1c 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchReadSourceTranslator.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchReadSourceTranslator.java
@@ -8,7 +8,7 @@ import org.apache.beam.sdk.values.PCollection;
class BatchReadSourceTranslator<T> implements TransformTranslator<PTransform<PBegin, PCollection<T>>> {
- @Override public void translateNode(PTransform<PBegin, PCollection<T>> transform,
+ @Override public void translateTransform(PTransform<PBegin, PCollection<T>> transform,
TranslationContext context) {
}
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchReshuffleTranslator.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchReshuffleTranslator.java
index 5fab1c8..5baa331 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchReshuffleTranslator.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchReshuffleTranslator.java
@@ -6,7 +6,7 @@ import org.apache.beam.sdk.transforms.Reshuffle;
class BatchReshuffleTranslator<K, InputT> implements TransformTranslator<Reshuffle<K, InputT>> {
- @Override public void translateNode(Reshuffle<K, InputT> transform, TranslationContext context) {
+ @Override public void translateTransform(Reshuffle<K, InputT> transform, TranslationContext context) {
}
}
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchWindowAssignTranslator.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchWindowAssignTranslator.java
index fbbced5..1a8f68b 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchWindowAssignTranslator.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchWindowAssignTranslator.java
@@ -8,7 +8,7 @@ import org.apache.beam.sdk.values.PCollection;
class BatchWindowAssignTranslator<T> implements
TransformTranslator<PTransform<PCollection<T>, PCollection<T>>> {
- @Override public void translateNode(PTransform<PCollection<T>, PCollection<T>> transform,
+ @Override public void translateTransform(PTransform<PCollection<T>, PCollection<T>> transform,
TranslationContext context) {
}
}