You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/01/04 10:38:28 UTC

[beam] 06/50: Add nodes translators structure

This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 28a9422293fc7390286bea084d2c7c895d2b32b6
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Wed Nov 21 09:36:49 2018 +0100

    Add nodes translators structure
---
 .../translation/BatchPipelineTranslator.java       | 20 -------
 .../translation/batch/BatchPipelineTranslator.java | 66 ++++++++++++++++++++++
 .../batch/BatchTransformTranslator.java            | 11 ++++
 .../translation/batch/BatchTranslationContext.java | 36 ++++++++++++
 .../batch/CombinePerKeyTranslatorBatch.java        | 14 +++++
 .../batch/FlattenPCollectionTranslatorBatch.java   | 13 +++++
 .../batch/GroupByKeyTranslatorBatch.java           | 14 +++++
 .../translation/batch/ParDoTranslatorBatch.java    | 13 +++++
 .../batch/ReadSourceTranslatorBatch.java           | 12 ++++
 .../batch/ReshuffleTranslatorBatch.java            | 11 ++++
 .../batch/WindowAssignTranslatorBatch.java         | 12 ++++
 .../StreamingPipelineTranslator.java               |  6 +-
 12 files changed, 206 insertions(+), 22 deletions(-)

diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/BatchPipelineTranslator.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/BatchPipelineTranslator.java
deleted file mode 100644
index e66555c..0000000
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/BatchPipelineTranslator.java
+++ /dev/null
@@ -1,20 +0,0 @@
-package org.apache.beam.runners.spark.structuredstreaming.translation;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.runners.TransformHierarchy;
-import org.apache.beam.sdk.values.PValue;
-
-public class BatchPipelineTranslator extends PipelineTranslator {
-
-
-  @Override public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
-    return super.enterCompositeTransform(node);
-  }
-
-
-  @Override public void visitPrimitiveTransform(TransformHierarchy.Node node) {
-    super.visitPrimitiveTransform(node);
-  }
-
-
-}
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchPipelineTranslator.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchPipelineTranslator.java
new file mode 100644
index 0000000..2f7ac23
--- /dev/null
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchPipelineTranslator.java
@@ -0,0 +1,66 @@
+package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
+
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.spark.structuredstreaming.translation.PipelineTranslator;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.runners.TransformHierarchy;
+import org.apache.beam.sdk.transforms.PTransform;
+
+/** {@link Pipeline.PipelineVisitor} for executing a {@link Pipeline} as a Spark batch job. */
+
+public class BatchPipelineTranslator extends PipelineTranslator {
+
+
+  // --------------------------------------------------------------------------------------------
+  //  Transform Translator Registry
+  // --------------------------------------------------------------------------------------------
+
+  @SuppressWarnings("rawtypes")
+  private static final Map<String, BatchTransformTranslator>
+      TRANSLATORS = new HashMap<>();
+
+  static {
+    TRANSLATORS.put(PTransformTranslation.COMBINE_PER_KEY_TRANSFORM_URN,
+        new CombinePerKeyTranslatorBatch());
+    TRANSLATORS
+        .put(PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN, new GroupByKeyTranslatorBatch());
+    TRANSLATORS.put(PTransformTranslation.RESHUFFLE_URN, new ReshuffleTranslatorBatch());
+
+    TRANSLATORS
+        .put(PTransformTranslation.FLATTEN_TRANSFORM_URN, new FlattenPCollectionTranslatorBatch());
+
+    TRANSLATORS
+        .put(PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN, new WindowAssignTranslatorBatch());
+
+    TRANSLATORS.put(PTransformTranslation.PAR_DO_TRANSFORM_URN, new ParDoTranslatorBatch());
+
+    TRANSLATORS.put(PTransformTranslation.READ_TRANSFORM_URN, new ReadSourceTranslatorBatch());
+  }
+
+  /** Returns a translator for the given node, if it is possible, otherwise null. */
+  private static BatchTransformTranslator<?> getTranslator(TransformHierarchy.Node node) {
+    @Nullable PTransform<?, ?> transform = node.getTransform();
+    // Root of the graph is null
+    if (transform == null) {
+      return null;
+    }
+    @Nullable String urn = PTransformTranslation.urnForTransformOrNull(transform);
+    return (urn == null) ? null : TRANSLATORS.get(urn);
+  }
+
+
+  @Override public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
+    return super.enterCompositeTransform(node);
+    //TODO impl
+  }
+
+
+  @Override public void visitPrimitiveTransform(TransformHierarchy.Node node) {
+    super.visitPrimitiveTransform(node);
+    //TODO impl
+  }
+
+  }
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchTransformTranslator.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchTransformTranslator.java
new file mode 100644
index 0000000..ab0cf68
--- /dev/null
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchTransformTranslator.java
@@ -0,0 +1,11 @@
+package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
+
+import org.apache.beam.sdk.transforms.PTransform;
+
+public interface BatchTransformTranslator<TransformT extends PTransform> {
+
+  /** A translator of a {@link PTransform} in batch mode. */
+
+  void translateNode(TransformT transform, BatchTranslationContext context);
+  }
+
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchTranslationContext.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchTranslationContext.java
new file mode 100644
index 0000000..554beea
--- /dev/null
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchTranslationContext.java
@@ -0,0 +1,36 @@
+package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.SparkSession;
+
+/**
+ * Keeps track of the {@link Dataset} and the step the translation is in.
+ */
+public class BatchTranslationContext {
+  private final Map<PValue, Dataset<?>> datasets;
+
+  /**
+   * For keeping track about which DataSets don't have a successor. We need to terminate these with
+   * a discarding sink because the Beam model allows dangling operations.
+   */
+  private final Map<PValue, Dataset<?>> danglingDataSets;
+
+  private final SparkSession sparkSession;
+  private final SparkPipelineOptions options;
+
+  private AppliedPTransform<?, ?, ?> currentTransform;
+
+
+  public BatchTranslationContext(SparkSession sparkSession, SparkPipelineOptions options) {
+    this.sparkSession = sparkSession;
+    this.options = options;
+    this.datasets = new HashMap<>();
+    this.danglingDataSets = new HashMap<>();
+  }
+
+}
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java
new file mode 100644
index 0000000..6099fbc
--- /dev/null
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java
@@ -0,0 +1,14 @@
+package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
+
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+class CombinePerKeyTranslatorBatch<K, InputT, AccumT, OutputT> implements BatchTransformTranslator<PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>>> {
+
+  @Override public void translateNode(
+      PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>> transform,
+      BatchTranslationContext context) {
+
+  }
+}
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/FlattenPCollectionTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/FlattenPCollectionTranslatorBatch.java
new file mode 100644
index 0000000..281eda9
--- /dev/null
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/FlattenPCollectionTranslatorBatch.java
@@ -0,0 +1,13 @@
+package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
+
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+
+class FlattenPCollectionTranslatorBatch<T> implements BatchTransformTranslator<PTransform<PCollectionList<T>, PCollection<T>>> {
+
+  @Override public void translateNode(PTransform<PCollectionList<T>, PCollection<T>> transform,
+      BatchTranslationContext context) {
+
+  }
+}
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java
new file mode 100644
index 0000000..bb0ccc1
--- /dev/null
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java
@@ -0,0 +1,14 @@
+package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
+
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+class GroupByKeyTranslatorBatch<K, InputT> implements BatchTransformTranslator<PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, Iterable<InputT>>>>> {
+
+  @Override public void translateNode(
+      PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, Iterable<InputT>>>> transform,
+      BatchTranslationContext context) {
+
+  }
+}
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
new file mode 100644
index 0000000..4477853
--- /dev/null
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
@@ -0,0 +1,13 @@
+package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
+
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+
+class ParDoTranslatorBatch<InputT, OutputT> implements BatchTransformTranslator<PTransform<PCollection<InputT>, PCollectionTuple>> {
+
+  @Override public void translateNode(PTransform<PCollection<InputT>, PCollectionTuple> transform,
+      BatchTranslationContext context) {
+
+  }
+}
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
new file mode 100644
index 0000000..a30fa70
--- /dev/null
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
@@ -0,0 +1,12 @@
+package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
+
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+
+class ReadSourceTranslatorBatch<T> implements BatchTransformTranslator<PTransform<PBegin, PCollection<T>>> {
+
+  @Override public void translateNode(PTransform<PBegin, PCollection<T>> transform, BatchTranslationContext context) {
+
+  }
+}
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReshuffleTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReshuffleTranslatorBatch.java
new file mode 100644
index 0000000..6283fdb
--- /dev/null
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReshuffleTranslatorBatch.java
@@ -0,0 +1,11 @@
+package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
+
+import org.apache.beam.sdk.transforms.Reshuffle;
+
+class ReshuffleTranslatorBatch<K, InputT> implements BatchTransformTranslator<Reshuffle<K, InputT>> {
+
+  @Override public void translateNode(Reshuffle<K, InputT> transform,
+      BatchTranslationContext context) {
+
+  }
+}
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java
new file mode 100644
index 0000000..21b71b9
--- /dev/null
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java
@@ -0,0 +1,12 @@
+package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
+
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+
+class WindowAssignTranslatorBatch<T> implements BatchTransformTranslator<PTransform<PCollection<T>, PCollection<T>>> {
+
+  @Override public void translateNode(PTransform<PCollection<T>, PCollection<T>> transform,
+      BatchTranslationContext context) {
+
+  }
+}
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/StreamingPipelineTranslator.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/StreamingPipelineTranslator.java
similarity index 53%
rename from runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/StreamingPipelineTranslator.java
rename to runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/StreamingPipelineTranslator.java
index 2058b37..547083c 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/StreamingPipelineTranslator.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/StreamingPipelineTranslator.java
@@ -1,5 +1,7 @@
-package org.apache.beam.runners.spark.structuredstreaming.translation;
+package org.apache.beam.runners.spark.structuredstreaming.translation.streaming;
 
-public class StreamingPipelineTranslator extends PipelineTranslator {
+import org.apache.beam.runners.spark.structuredstreaming.translation.PipelineTranslator;
 
+public class StreamingPipelineTranslator extends PipelineTranslator {
+//TODO impl
 }