You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2018/11/27 15:39:17 UTC
[beam] 07/20: Wire node translators with pipeline translator
This is an automated email from the ASF dual-hosted git repository.
echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git
commit b77c7bbee35b9cf48e2a2994ad7330e7e2fa9ae6
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Wed Nov 21 10:43:19 2018 +0100
Wire node translators with pipeline translator
---
.../translation/PipelineTranslator.java | 15 ++++-
.../translation/batch/BatchPipelineTranslator.java | 66 ++++++++++++++++++++--
.../translation/batch/BatchTranslationContext.java | 3 +
3 files changed, 77 insertions(+), 7 deletions(-)
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java
index 99621f6..db5c354 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java
@@ -21,7 +21,6 @@ import org.slf4j.LoggerFactory;
public class PipelineTranslator extends Pipeline.PipelineVisitor.Defaults{
-
/**
* Local configurations work in the same JVM and have no problems with improperly formatted files
* on classpath (eg. directories with .class files or empty directories). Prepare files for
@@ -51,6 +50,20 @@ public class PipelineTranslator extends Pipeline.PipelineVisitor.Defaults{
}
/**
+ * Utility formatting method.
+ *
+ * @param n number of spaces to generate
+ * @return String with "|" followed by n spaces
+ */
+ protected static String genSpaces(int n) {
+ StringBuilder builder = new StringBuilder();
+ for (int i = 0; i < n; i++) {
+ builder.append("| ");
+ }
+ return builder.toString();
+ }
+
+ /**
* Translates the pipeline by passing this class as a visitor.
*
* @param pipeline The pipeline to be translated
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchPipelineTranslator.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchPipelineTranslator.java
index 2f7ac23..e20e4c0 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchPipelineTranslator.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchPipelineTranslator.java
@@ -8,6 +8,8 @@ import org.apache.beam.runners.spark.structuredstreaming.translation.PipelineTra
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.transforms.PTransform;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/** {@link Pipeline.PipelineVisitor} for executing a {@link Pipeline} as a Spark batch job. */
@@ -18,6 +20,9 @@ public class BatchPipelineTranslator extends PipelineTranslator {
// Transform Translator Registry
// --------------------------------------------------------------------------------------------
+ private BatchTranslationContext translationContext;
+ private int depth = 0;
+
@SuppressWarnings("rawtypes")
private static final Map<String, BatchTransformTranslator>
TRANSLATORS = new HashMap<>();
@@ -39,6 +44,9 @@ public class BatchPipelineTranslator extends PipelineTranslator {
TRANSLATORS.put(PTransformTranslation.READ_TRANSFORM_URN, new ReadSourceTranslatorBatch());
}
+ private static final Logger LOG = LoggerFactory.getLogger(BatchPipelineTranslator.class);
+
+
/** Returns a translator for the given node, if it is possible, otherwise null. */
private static BatchTransformTranslator<?> getTranslator(TransformHierarchy.Node node) {
@@ -52,15 +60,61 @@ public class BatchPipelineTranslator extends PipelineTranslator {
}
- @Override public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
- return super.enterCompositeTransform(node);
- //TODO impl
+ // --------------------------------------------------------------------------------------------
+ // Pipeline Visitor Methods
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
+ LOG.info("{} enterCompositeTransform- {}", genSpaces(depth), node.getFullName());
+ depth++;
+
+ BatchTransformTranslator<?> translator = getTranslator(node);
+
+ if (translator != null) {
+ translateNode(node, translator);
+ LOG.info("{} translated- {}", genSpaces(depth), node.getFullName());
+ return CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
+ } else {
+ return CompositeBehavior.ENTER_TRANSFORM;
+ }
}
+ @Override
+ public void leaveCompositeTransform(TransformHierarchy.Node node) {
+ depth--;
+ LOG.info("{} leaveCompositeTransform- {}", genSpaces(depth), node.getFullName());
+ }
- @Override public void visitPrimitiveTransform(TransformHierarchy.Node node) {
- super.visitPrimitiveTransform(node);
- //TODO impl
+ @Override
+ public void visitPrimitiveTransform(TransformHierarchy.Node node) {
+ LOG.info("{} visitPrimitiveTransform- {}", genSpaces(depth), node.getFullName());
+
+ // get the transformation corresponding to the node we are
+ // currently visiting and translate it into its Spark alternative.
+ BatchTransformTranslator<?> translator = getTranslator(node);
+ if (translator == null) {
+ String transformUrn = PTransformTranslation.urnForTransform(node.getTransform());
+ throw new UnsupportedOperationException(
+ "The transform " + transformUrn + " is currently not supported.");
+ }
+ translateNode(node, translator);
}
+ private <T extends PTransform<?, ?>> void translateNode(
+ TransformHierarchy.Node node,
+ BatchTransformTranslator<?> translator) {
+
+ @SuppressWarnings("unchecked")
+ T typedTransform = (T) node.getTransform();
+
+ @SuppressWarnings("unchecked")
+ BatchTransformTranslator<T> typedTranslator = (BatchTransformTranslator<T>) translator;
+
+ // create the applied PTransform on the translationContext
+ translationContext.setCurrentTransform(node.toAppliedPTransform(getPipeline()));
+ typedTranslator.translateNode(typedTransform, translationContext);
}
+
+
+}
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchTranslationContext.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchTranslationContext.java
index 554beea..1d991f1 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchTranslationContext.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchTranslationContext.java
@@ -33,4 +33,7 @@ public class BatchTranslationContext {
this.danglingDataSets = new HashMap<>();
}
+ public void setCurrentTransform(AppliedPTransform<?, ?, ?> currentTransform) {
+ this.currentTransform = currentTransform;
+ }
}