You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2018/11/27 15:39:18 UTC
[beam] 08/20: Renames: better differenciate pipeline translator for
transform translator
This is an automated email from the ASF dual-hosted git repository.
echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git
commit b6b426e99f8f4ce322a6327c93af8918318a1563
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Wed Nov 21 10:52:43 2018 +0100
Renames: better differenciate pipeline translator for transform translator
---
.../spark/structuredstreaming/SparkRunner.java | 6 ++--
.../translation/batch/BatchPipelineTranslator.java | 39 +++++++++++-----------
2 files changed, 22 insertions(+), 23 deletions(-)
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java
index 59c08f7..3e3b112 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java
@@ -98,9 +98,9 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
PipelineTranslator.detectTranslationMode(pipeline, options);
PipelineTranslator.replaceTransforms(pipeline, options);
PipelineTranslator.prepareFilesToStageForRemoteClusterExecution(options);
- PipelineTranslator translator = options.isStreaming() ? new StreamingPipelineTranslator() : new BatchPipelineTranslator();
- //init translator with subclass based on mode and env
- translator.translate(pipeline);
+ PipelineTranslator pipelineTranslator = options.isStreaming() ? new StreamingPipelineTranslator() : new BatchPipelineTranslator();
+ //init pipelineTranslator with subclass based on mode and env
+ pipelineTranslator.translate(pipeline);
}
private void executePipeline(Pipeline pipeline) {}
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchPipelineTranslator.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchPipelineTranslator.java
index e20e4c0..2459372 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchPipelineTranslator.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchPipelineTranslator.java
@@ -24,39 +24,38 @@ public class BatchPipelineTranslator extends PipelineTranslator {
private int depth = 0;
@SuppressWarnings("rawtypes")
- private static final Map<String, BatchTransformTranslator>
- TRANSLATORS = new HashMap<>();
+ private static final Map<String, BatchTransformTranslator> TRANSFORM_TRANSLATORS = new HashMap<>();
static {
- TRANSLATORS.put(PTransformTranslation.COMBINE_PER_KEY_TRANSFORM_URN,
+ TRANSFORM_TRANSLATORS.put(PTransformTranslation.COMBINE_PER_KEY_TRANSFORM_URN,
new CombinePerKeyTranslatorBatch());
- TRANSLATORS
+ TRANSFORM_TRANSLATORS
.put(PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN, new GroupByKeyTranslatorBatch());
- TRANSLATORS.put(PTransformTranslation.RESHUFFLE_URN, new ReshuffleTranslatorBatch());
+ TRANSFORM_TRANSLATORS.put(PTransformTranslation.RESHUFFLE_URN, new ReshuffleTranslatorBatch());
- TRANSLATORS
+ TRANSFORM_TRANSLATORS
.put(PTransformTranslation.FLATTEN_TRANSFORM_URN, new FlattenPCollectionTranslatorBatch());
- TRANSLATORS
+ TRANSFORM_TRANSLATORS
.put(PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN, new WindowAssignTranslatorBatch());
- TRANSLATORS.put(PTransformTranslation.PAR_DO_TRANSFORM_URN, new ParDoTranslatorBatch());
+ TRANSFORM_TRANSLATORS.put(PTransformTranslation.PAR_DO_TRANSFORM_URN, new ParDoTranslatorBatch());
- TRANSLATORS.put(PTransformTranslation.READ_TRANSFORM_URN, new ReadSourceTranslatorBatch());
+ TRANSFORM_TRANSLATORS.put(PTransformTranslation.READ_TRANSFORM_URN, new ReadSourceTranslatorBatch());
}
private static final Logger LOG = LoggerFactory.getLogger(BatchPipelineTranslator.class);
/** Returns a translator for the given node, if it is possible, otherwise null. */
- private static BatchTransformTranslator<?> getTranslator(TransformHierarchy.Node node) {
+ private static BatchTransformTranslator<?> getTransformTranslator(TransformHierarchy.Node node) {
@Nullable PTransform<?, ?> transform = node.getTransform();
// Root of the graph is null
if (transform == null) {
return null;
}
@Nullable String urn = PTransformTranslation.urnForTransformOrNull(transform);
- return (urn == null) ? null : TRANSLATORS.get(urn);
+ return (urn == null) ? null : TRANSFORM_TRANSLATORS.get(urn);
}
@@ -69,10 +68,10 @@ public class BatchPipelineTranslator extends PipelineTranslator {
LOG.info("{} enterCompositeTransform- {}", genSpaces(depth), node.getFullName());
depth++;
- BatchTransformTranslator<?> translator = getTranslator(node);
+ BatchTransformTranslator<?> transformTranslator = getTransformTranslator(node);
- if (translator != null) {
- translateNode(node, translator);
+ if (transformTranslator != null) {
+ translateNode(node, transformTranslator);
LOG.info("{} translated- {}", genSpaces(depth), node.getFullName());
return CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
} else {
@@ -92,28 +91,28 @@ public class BatchPipelineTranslator extends PipelineTranslator {
// get the transformation corresponding to the node we are
// currently visiting and translate it into its Spark alternative.
- BatchTransformTranslator<?> translator = getTranslator(node);
- if (translator == null) {
+ BatchTransformTranslator<?> transformTranslator = getTransformTranslator(node);
+ if (transformTranslator == null) {
String transformUrn = PTransformTranslation.urnForTransform(node.getTransform());
throw new UnsupportedOperationException(
"The transform " + transformUrn + " is currently not supported.");
}
- translateNode(node, translator);
+ translateNode(node, transformTranslator);
}
private <T extends PTransform<?, ?>> void translateNode(
TransformHierarchy.Node node,
- BatchTransformTranslator<?> translator) {
+ BatchTransformTranslator<?> transformTranslator) {
@SuppressWarnings("unchecked")
T typedTransform = (T) node.getTransform();
@SuppressWarnings("unchecked")
- BatchTransformTranslator<T> typedTranslator = (BatchTransformTranslator<T>) translator;
+ BatchTransformTranslator<T> typedTransformTranslator = (BatchTransformTranslator<T>) transformTranslator;
// create the applied PTransform on the translationContext
translationContext.setCurrentTransform(node.toAppliedPTransform(getPipeline()));
- typedTranslator.translateNode(typedTransform, translationContext);
+ typedTransformTranslator.translateNode(typedTransform, translationContext);
}