You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/03/01 15:53:51 UTC

[beam] branch spark-runner_structured-streaming updated (46914af -> 010fcb6)

This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a change to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git.


    from 46914af  [TO UPGRADE WITH THE 2 SPARK RUNNERS BEFORE MERGE] Change de wordcount build to test on new spark runner
     new 818dc87  Cleaning
     new 010fcb6  Implement WindowAssignTranslatorBatch

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../batch/CombineGloballyTranslatorBatch.java      | 10 ----
 .../batch/WindowAssignTranslatorBatch.java         | 24 ++++++++-
 .../translation/helpers/WindowingHelpers.java      | 62 ++++++++++++++++++++--
 3 files changed, 81 insertions(+), 15 deletions(-)


[beam] 01/02: Cleaning

Posted by ec...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 818dc870ca15370e8c6f6cdd91151853bbf016ef
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Fri Mar 1 11:41:34 2019 +0100

    Cleaning
---
 .../translation/batch/CombineGloballyTranslatorBatch.java      | 10 ----------
 1 file changed, 10 deletions(-)

diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineGloballyTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineGloballyTranslatorBatch.java
index 80ca4b9..48024b6 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineGloballyTranslatorBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineGloballyTranslatorBatch.java
@@ -52,30 +52,20 @@ class CombineGloballyTranslatorBatch<InputT, AccumT, OutputT>
 
     Dataset<WindowedValue<InputT>> inputDataset = context.getDataset(input);
 
-    System.out.println("****** inputDataset ******" + inputDataset.schema());
-
     Dataset<InputT> unWindowedDataset =
         inputDataset.map(
             WindowingHelpers.unwindowMapFunction(), EncoderHelpers.genericEncoder());
 
-    System.out.println("****** unWindowedDataset ******" + unWindowedDataset.schema());
-
     Dataset<Row> combinedRowDataset = unWindowedDataset
         .agg(new AggregatorCombinerGlobally<>(combineFn).toColumn());
 
-    System.out.println("*****combinedRowDataset*******" + combinedRowDataset.schema());
-
     Dataset<OutputT> combinedDataset = combinedRowDataset
         .map(RowHelpers.extractObjectFromRowMapFunction(), EncoderHelpers.genericEncoder());
 
-    System.out.println("****** combinedDataset ******" + combinedDataset.schema());
-
     // Window the result into global window.
     Dataset<WindowedValue<OutputT>> outputDataset = combinedDataset
         .map(WindowingHelpers.windowMapFunction(), EncoderHelpers.windowedValueEncoder());
 
-    System.out.println("****** outputDataset ******" + outputDataset.schema());
-
     context.putDataset(output, outputDataset);
   }
 }


[beam] 02/02: Implement WindowAssignTranslatorBatch

Posted by ec...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 010fcb664035eafb7f1db2e8bdc81c7f1260a003
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Fri Mar 1 16:51:36 2019 +0100

    Implement WindowAssignTranslatorBatch
---
 .../batch/WindowAssignTranslatorBatch.java         | 24 ++++++++-
 .../translation/helpers/WindowingHelpers.java      | 62 ++++++++++++++++++++--
 2 files changed, 81 insertions(+), 5 deletions(-)

diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java
index 51e21c2..b27181a 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java
@@ -19,13 +19,35 @@ package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
 
 import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator;
 import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
+import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers;
+import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.WindowingHelpers;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.spark.sql.Dataset;
 
 class WindowAssignTranslatorBatch<T>
     implements TransformTranslator<PTransform<PCollection<T>, PCollection<T>>> {
 
   @Override
   public void translateTransform(
-      PTransform<PCollection<T>, PCollection<T>> transform, TranslationContext context) {}
+      PTransform<PCollection<T>, PCollection<T>> transform, TranslationContext context) {
+
+    Window.Assign<T> assignTransform = (Window.Assign<T>) transform;
+    @SuppressWarnings("unchecked")
+    final PCollection<T> input = (PCollection<T>) context.getInput();
+    @SuppressWarnings("unchecked")
+    final PCollection<T> output = (PCollection<T>) context.getOutput();
+
+    Dataset<WindowedValue<T>> inputDataset = context.getDataset(input);
+    if (WindowingHelpers.skipAssignWindows(assignTransform, context)) {
+      context.putDataset(output, inputDataset);
+    } else {
+      Dataset<WindowedValue<T>> outputDataset = inputDataset
+          .map(WindowingHelpers.assignWindowsMapFunction(assignTransform.getWindowFn()),
+              EncoderHelpers.windowedValueEncoder());
+      context.putDataset(output, outputDataset);
+    }
+  }
 }
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/WindowingHelpers.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/WindowingHelpers.java
index 45b5153..8188782 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/WindowingHelpers.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/WindowingHelpers.java
@@ -17,8 +17,17 @@
  */
 package org.apache.beam.runners.spark.structuredstreaming.translation.helpers;
 
+import com.google.common.collect.Iterables;
+import java.util.Collection;
+import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollection;
 import org.apache.spark.api.java.function.MapFunction;
+import org.joda.time.Instant;
 
 /** Helper functions for working with windows. */
 public final class WindowingHelpers {
@@ -33,8 +42,8 @@ public final class WindowingHelpers {
    */
   public static <T> MapFunction<T, WindowedValue<T>> windowMapFunction() {
     return new MapFunction<T, WindowedValue<T>>() {
-      @Override
-      public WindowedValue<T> call(T t) {
+
+      @Override public WindowedValue<T> call(T t) {
         return WindowedValue.valueInGlobalWindow(t);
       }
     };
@@ -48,10 +57,55 @@ public final class WindowingHelpers {
    */
   public static <T> MapFunction<WindowedValue<T>, T> unwindowMapFunction() {
     return new MapFunction<WindowedValue<T>, T>() {
-      @Override
-      public T call(WindowedValue<T> t) {
+
+      @Override public T call(WindowedValue<T> t) {
         return t.getValue();
       }
     };
   }
+
+  /**
+   * Checks if the window transformation should be applied or skipped.
+   *
+   * <p>Avoid running assign windows if both source and destination are global window or if the user
+   * has not specified the WindowFn (meaning they are just messing with triggering or allowed
+   * lateness).
+   *
+   */
+  @SuppressWarnings("unchecked") public static <T, W extends BoundedWindow> boolean skipAssignWindows(
+      Window.Assign<T> transform, TranslationContext context) {
+    WindowFn<? super T, W> windowFnToApply = (WindowFn<? super T, W>) transform.getWindowFn();
+    PCollection<T> input = (PCollection<T>) context.getInput();
+    WindowFn<?, ?> windowFnOfInput = input.getWindowingStrategy().getWindowFn();
+    return windowFnToApply == null || (windowFnOfInput instanceof GlobalWindows
+        && windowFnToApply instanceof GlobalWindows);
+  }
+
+  public static <T, W extends BoundedWindow> MapFunction<WindowedValue<T>, WindowedValue<T>> assignWindowsMapFunction(
+      WindowFn<T, W> windowFn) {
+    return new MapFunction<WindowedValue<T>, WindowedValue<T>>() {
+
+      @Override public WindowedValue<T> call(WindowedValue<T> windowedValue) throws Exception {
+        final BoundedWindow boundedWindow = Iterables.getOnlyElement(windowedValue.getWindows());
+        final T element = windowedValue.getValue();
+        final Instant timestamp = windowedValue.getTimestamp();
+        Collection<W> windows = windowFn.assignWindows(windowFn.new AssignContext() {
+
+          @Override public T element() {
+            return element;
+          }
+
+          @Override public Instant timestamp() {
+            return timestamp;
+          }
+
+          @Override public BoundedWindow window() {
+            return boundedWindow;
+          }
+        });
+        return WindowedValue.of(element, timestamp, windows, windowedValue.getPane());
+      }
+    };
+  }
 }
+