You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2018/11/29 15:13:12 UTC

[beam] branch spark-runner_structured-streaming updated (84e73e5 -> 5ca19f2)

This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a change to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git.


 discard 84e73e5  Create Datasets manipulation methods
 discard ca89100  Create PCollections manipulation methods
     new 26238ce  Create PCollections manipulation methods
     new 59acff8  Create Datasets manipulation methods
     new 5ca19f2  Add Flatten transformation translator

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (84e73e5)
            \
             N -- N -- N   refs/heads/spark-runner_structured-streaming (5ca19f2)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../translation/TranslationContext.java            | 28 ++++++++---------
 ...latorBatch.java => FlattenTranslatorBatch.java} | 35 ++++++++++++++++++++--
 .../translation/batch/PipelineTranslatorBatch.java |  2 +-
 3 files changed, 47 insertions(+), 18 deletions(-)
 rename runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/{FlattenPCollectionTranslatorBatch.java => FlattenTranslatorBatch.java} (55%)


[beam] 03/03: Add Flatten transformation translator

Posted by ec...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 5ca19f255eb3bf34007fcf276cfa3977d52af832
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Thu Nov 29 16:02:11 2018 +0100

    Add Flatten transformation translator
---
 .../translation/TranslationContext.java            |  4 +++
 ...latorBatch.java => FlattenTranslatorBatch.java} | 35 ++++++++++++++++++++--
 .../translation/batch/PipelineTranslatorBatch.java |  2 +-
 3 files changed, 38 insertions(+), 3 deletions(-)

diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
index 98f77af..3c29867 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
@@ -83,6 +83,10 @@ public class TranslationContext {
   // --------------------------------------------------------------------------------------------
   //  Datasets methods
   // --------------------------------------------------------------------------------------------
+  @SuppressWarnings("unchecked")
+  public <T> Dataset<T> emptyDataset() {
+    return (Dataset<T>) sparkSession.emptyDataset(Encoders.bean(Void.class));
+  }
 
   @SuppressWarnings("unchecked")
   public <T> Dataset<WindowedValue<T>> getDataset(PValue value) {
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/FlattenPCollectionTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/FlattenTranslatorBatch.java
similarity index 55%
rename from runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/FlattenPCollectionTranslatorBatch.java
rename to runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/FlattenTranslatorBatch.java
index 87a250e..2739e83 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/FlattenPCollectionTranslatorBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/FlattenTranslatorBatch.java
@@ -17,16 +17,47 @@
  */
 package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
 
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.util.Map;
 import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator;
 import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.spark.sql.Dataset;
 
-class FlattenPCollectionTranslatorBatch<T>
+class FlattenTranslatorBatch<T>
     implements TransformTranslator<PTransform<PCollectionList<T>, PCollection<T>>> {
 
   @Override
   public void translateTransform(
-      PTransform<PCollectionList<T>, PCollection<T>> transform, TranslationContext context) {}
+      PTransform<PCollectionList<T>, PCollection<T>> transform, TranslationContext context) {
+    Map<TupleTag<?>, PValue> inputs = context.getInputs();
+    Dataset<WindowedValue<T>> result = null;
+
+    if (inputs.isEmpty()) {
+      result = context.emptyDataset();
+    } else {
+      for (PValue pValue : inputs.values()) {
+        checkArgument(
+            pValue instanceof PCollection,
+            "Got non-PCollection input to flatten: %s of type %s",
+            pValue,
+            pValue.getClass().getSimpleName());
+        @SuppressWarnings("unchecked")
+        PCollection<T> pCollection = (PCollection<T>) pValue;
+        Dataset<WindowedValue<T>> current = context.getDataset(pCollection);
+        if (result == null) {
+          result = current;
+        } else {
+          result = result.union(current);
+        }
+      }
+    }
+    context.putDataset(context.getOutput(), result);
+  }
 }
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java
index 318d74c..26f1b9c 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java
@@ -56,7 +56,7 @@ public class PipelineTranslatorBatch extends PipelineTranslator {
     TRANSFORM_TRANSLATORS.put(PTransformTranslation.RESHUFFLE_URN, new ReshuffleTranslatorBatch());
 
     TRANSFORM_TRANSLATORS.put(
-        PTransformTranslation.FLATTEN_TRANSFORM_URN, new FlattenPCollectionTranslatorBatch());
+        PTransformTranslation.FLATTEN_TRANSFORM_URN, new FlattenTranslatorBatch());
 
     TRANSFORM_TRANSLATORS.put(
         PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN, new WindowAssignTranslatorBatch());


[beam] 01/03: Create PCollections manipulation methods

Posted by ec...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 26238ce88d2cdca169587ac55547c146f267a39b
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Thu Nov 29 11:48:20 2018 +0100

    Create PCollections manipulation methods
---
 .../translation/TranslationContext.java            | 56 +++++++++++++++++++++-
 1 file changed, 55 insertions(+), 1 deletion(-)

diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
index 71ae276..a3276bf 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
@@ -17,17 +17,25 @@
  */
 package org.apache.beam.runners.spark.structuredstreaming.translation;
 
+import com.google.common.collect.Iterables;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.util.HashMap;
-import java.util.LinkedHashSet;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.beam.runners.core.construction.TransformInputs;
 import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions;
+import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
 import org.apache.spark.SparkConf;
 import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.ForeachWriter;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.streaming.StreamingQueryException;
@@ -40,6 +48,7 @@ public class TranslationContext {
 
   private final Map<PValue, Dataset<?>> datasets;
   private final Set<Dataset<?>> leaves;
+
   private final SparkPipelineOptions options;
 
   @SuppressFBWarnings("URF_UNREAD_FIELD") // make findbug happy
@@ -62,10 +71,55 @@ public class TranslationContext {
     this.leaves = new LinkedHashSet<>();
   }
 
+  // --------------------------------------------------------------------------------------------
+  //  Transforms methods
+  // --------------------------------------------------------------------------------------------
   public void setCurrentTransform(AppliedPTransform<?, ?, ?> currentTransform) {
     this.currentTransform = currentTransform;
   }
 
+  // --------------------------------------------------------------------------------------------
+  //  Datasets methods
+  // --------------------------------------------------------------------------------------------
+
+
+  // --------------------------------------------------------------------------------------------
+  //  PCollections methods
+  // --------------------------------------------------------------------------------------------
+  @SuppressWarnings("unchecked")
+  public PValue getInput() {
+    return Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(currentTransform));
+  }
+
+  @SuppressWarnings("unchecked")
+  public Map<TupleTag<?>, PValue> getInputs() {
+    return currentTransform.getInputs();
+  }
+
+  @SuppressWarnings("unchecked")
+  public PValue getOutput() {
+    return Iterables.getOnlyElement(currentTransform.getOutputs().values());
+  }
+
+  @SuppressWarnings("unchecked")
+  public Map<TupleTag<?>, PValue> getOutputs() {
+    return currentTransform.getOutputs();
+  }
+
+  @SuppressWarnings("unchecked")
+  public Map<TupleTag<?>, Coder<?>> getOutputCoders() {
+    return currentTransform
+        .getOutputs()
+        .entrySet()
+        .stream()
+        .filter(e -> e.getValue() instanceof PCollection)
+        .collect(Collectors.toMap(e -> e.getKey(), e -> ((PCollection) e.getValue()).getCoder()));
+  }
+
+  // --------------------------------------------------------------------------------------------
+  //  Pipeline methods
+  // --------------------------------------------------------------------------------------------
+
   public void startPipeline() {
     try {
       // to start a pipeline we need a DatastreamWriter to start


[beam] 02/03: Create Datasets manipulation methods

Posted by ec...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 59acff8bd5d14254c0d5c2fd496cdbd96a4cab24
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Thu Nov 29 16:11:35 2018 +0100

    Create Datasets manipulation methods
---
 .../translation/TranslationContext.java                | 18 +++++++++++++++++-
 1 file changed, 17 insertions(+), 1 deletion(-)

diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
index a3276bf..98f77af 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
@@ -46,7 +46,9 @@ import org.apache.spark.sql.streaming.StreamingQueryException;
  */
 public class TranslationContext {
 
+  /** All the datasets of the DAG */
   private final Map<PValue, Dataset<?>> datasets;
+  /** datasets that are not used as input to other datasets (leaves of the DAG) */
   private final Set<Dataset<?>> leaves;
 
   private final SparkPipelineOptions options;
@@ -68,7 +70,7 @@ public class TranslationContext {
     this.sparkSession = SparkSession.builder().config(sparkConf).getOrCreate();
     this.options = options;
     this.datasets = new HashMap<>();
-    this.leaves = new LinkedHashSet<>();
+    this.leaves = new HashSet<>();
   }
 
   // --------------------------------------------------------------------------------------------
@@ -82,6 +84,20 @@ public class TranslationContext {
   //  Datasets methods
   // --------------------------------------------------------------------------------------------
 
+  @SuppressWarnings("unchecked")
+  public <T> Dataset<WindowedValue<T>> getDataset(PValue value) {
+    Dataset<?> dataset = datasets.get(value);
+    // assume that the Dataset is used as an input if retrieved here. So it is not a leaf anymore
+    leaves.remove(dataset);
+    return (Dataset<WindowedValue<T>>) dataset;
+  }
+
+  public <T> void putDataset(PValue value, Dataset<WindowedValue<T>> dataset) {
+    if (!datasets.containsKey(value)) {
+      datasets.put(value, dataset);
+      leaves.add(dataset);
+    }
+  }
 
   // --------------------------------------------------------------------------------------------
   //  PCollections methods