You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/01/04 10:38:47 UTC

[beam] 25/50: Add Flatten transformation translator

This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 286d7f36480d79ad54f2e92f0b8af8c4ba716621
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Thu Nov 29 16:02:11 2018 +0100

    Add Flatten transformation translator
---
 .../translation/TranslationContext.java            |  4 +++
 ...latorBatch.java => FlattenTranslatorBatch.java} | 35 ++++++++++++++++++++--
 .../translation/batch/PipelineTranslatorBatch.java |  2 +-
 3 files changed, 38 insertions(+), 3 deletions(-)

diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
index 98f77af..3c29867 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
@@ -83,6 +83,10 @@ public class TranslationContext {
   // --------------------------------------------------------------------------------------------
   //  Datasets methods
   // --------------------------------------------------------------------------------------------
+  @SuppressWarnings("unchecked")
+  public <T> Dataset<T> emptyDataset() {
+    return (Dataset<T>) sparkSession.emptyDataset(Encoders.bean(Void.class));
+  }
 
   @SuppressWarnings("unchecked")
   public <T> Dataset<WindowedValue<T>> getDataset(PValue value) {
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/FlattenPCollectionTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/FlattenTranslatorBatch.java
similarity index 55%
rename from runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/FlattenPCollectionTranslatorBatch.java
rename to runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/FlattenTranslatorBatch.java
index 87a250e..2739e83 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/FlattenPCollectionTranslatorBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/FlattenTranslatorBatch.java
@@ -17,16 +17,47 @@
  */
 package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
 
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.util.Map;
 import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator;
 import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.spark.sql.Dataset;
 
-class FlattenPCollectionTranslatorBatch<T>
+class FlattenTranslatorBatch<T>
     implements TransformTranslator<PTransform<PCollectionList<T>, PCollection<T>>> {
 
   @Override
   public void translateTransform(
-      PTransform<PCollectionList<T>, PCollection<T>> transform, TranslationContext context) {}
+      PTransform<PCollectionList<T>, PCollection<T>> transform, TranslationContext context) {
+    Map<TupleTag<?>, PValue> inputs = context.getInputs();
+    Dataset<WindowedValue<T>> result = null;
+
+    if (inputs.isEmpty()) {
+      result = context.emptyDataset();
+    } else {
+      for (PValue pValue : inputs.values()) {
+        checkArgument(
+            pValue instanceof PCollection,
+            "Got non-PCollection input to flatten: %s of type %s",
+            pValue,
+            pValue.getClass().getSimpleName());
+        @SuppressWarnings("unchecked")
+        PCollection<T> pCollection = (PCollection<T>) pValue;
+        Dataset<WindowedValue<T>> current = context.getDataset(pCollection);
+        if (result == null) {
+          result = current;
+        } else {
+          result = result.union(current);
+        }
+      }
+    }
+    context.putDataset(context.getOutput(), result);
+  }
 }
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java
index 318d74c..26f1b9c 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java
@@ -56,7 +56,7 @@ public class PipelineTranslatorBatch extends PipelineTranslator {
     TRANSFORM_TRANSLATORS.put(PTransformTranslation.RESHUFFLE_URN, new ReshuffleTranslatorBatch());
 
     TRANSFORM_TRANSLATORS.put(
-        PTransformTranslation.FLATTEN_TRANSFORM_URN, new FlattenPCollectionTranslatorBatch());
+        PTransformTranslation.FLATTEN_TRANSFORM_URN, new FlattenTranslatorBatch());
 
     TRANSFORM_TRANSLATORS.put(
         PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN, new WindowAssignTranslatorBatch());