You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/03/01 15:53:53 UTC

[beam] 02/02: Implement WindowAssignTranslatorBatch

This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 010fcb664035eafb7f1db2e8bdc81c7f1260a003
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Fri Mar 1 16:51:36 2019 +0100

    Implement WindowAssignTranslatorBatch
---
 .../batch/WindowAssignTranslatorBatch.java         | 24 ++++++++-
 .../translation/helpers/WindowingHelpers.java      | 62 ++++++++++++++++++++--
 2 files changed, 81 insertions(+), 5 deletions(-)

diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java
index 51e21c2..b27181a 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java
@@ -19,13 +19,35 @@ package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
 
 import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator;
 import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
+import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers;
+import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.WindowingHelpers;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.spark.sql.Dataset;
 
 class WindowAssignTranslatorBatch<T>
     implements TransformTranslator<PTransform<PCollection<T>, PCollection<T>>> {
 
   @Override
   public void translateTransform(
-      PTransform<PCollection<T>, PCollection<T>> transform, TranslationContext context) {}
+      PTransform<PCollection<T>, PCollection<T>> transform, TranslationContext context) {
+
+    Window.Assign<T> assignTransform = (Window.Assign<T>) transform;
+    @SuppressWarnings("unchecked")
+    final PCollection<T> input = (PCollection<T>) context.getInput();
+    @SuppressWarnings("unchecked")
+    final PCollection<T> output = (PCollection<T>) context.getOutput();
+
+    Dataset<WindowedValue<T>> inputDataset = context.getDataset(input);
+    if (WindowingHelpers.skipAssignWindows(assignTransform, context)) {
+      context.putDataset(output, inputDataset);
+    } else {
+      Dataset<WindowedValue<T>> outputDataset = inputDataset
+          .map(WindowingHelpers.assignWindowsMapFunction(assignTransform.getWindowFn()),
+              EncoderHelpers.windowedValueEncoder());
+      context.putDataset(output, outputDataset);
+    }
+  }
 }
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/WindowingHelpers.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/WindowingHelpers.java
index 45b5153..8188782 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/WindowingHelpers.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/WindowingHelpers.java
@@ -17,8 +17,17 @@
  */
 package org.apache.beam.runners.spark.structuredstreaming.translation.helpers;
 
+import com.google.common.collect.Iterables;
+import java.util.Collection;
+import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollection;
 import org.apache.spark.api.java.function.MapFunction;
+import org.joda.time.Instant;
 
 /** Helper functions for working with windows. */
 public final class WindowingHelpers {
@@ -33,8 +42,8 @@ public final class WindowingHelpers {
    */
   public static <T> MapFunction<T, WindowedValue<T>> windowMapFunction() {
     return new MapFunction<T, WindowedValue<T>>() {
-      @Override
-      public WindowedValue<T> call(T t) {
+
+      @Override public WindowedValue<T> call(T t) {
         return WindowedValue.valueInGlobalWindow(t);
       }
     };
@@ -48,10 +57,55 @@ public final class WindowingHelpers {
    */
   public static <T> MapFunction<WindowedValue<T>, T> unwindowMapFunction() {
     return new MapFunction<WindowedValue<T>, T>() {
-      @Override
-      public T call(WindowedValue<T> t) {
+
+      @Override public T call(WindowedValue<T> t) {
         return t.getValue();
       }
     };
   }
+
+  /**
+   * Checks if the window transformation should be applied or skipped.
+   *
+   * <p>Avoid running assign windows if both source and destination are global window or if the user
+   * has not specified the WindowFn (meaning they are just messing with triggering or allowed
+   * lateness).
+   *
+   */
+  @SuppressWarnings("unchecked") public static <T, W extends BoundedWindow> boolean skipAssignWindows(
+      Window.Assign<T> transform, TranslationContext context) {
+    WindowFn<? super T, W> windowFnToApply = (WindowFn<? super T, W>) transform.getWindowFn();
+    PCollection<T> input = (PCollection<T>) context.getInput();
+    WindowFn<?, ?> windowFnOfInput = input.getWindowingStrategy().getWindowFn();
+    return windowFnToApply == null || (windowFnOfInput instanceof GlobalWindows
+        && windowFnToApply instanceof GlobalWindows);
+  }
+
+  public static <T, W extends BoundedWindow> MapFunction<WindowedValue<T>, WindowedValue<T>> assignWindowsMapFunction(
+      WindowFn<T, W> windowFn) {
+    return new MapFunction<WindowedValue<T>, WindowedValue<T>>() {
+
+      @Override public WindowedValue<T> call(WindowedValue<T> windowedValue) throws Exception {
+        final BoundedWindow boundedWindow = Iterables.getOnlyElement(windowedValue.getWindows());
+        final T element = windowedValue.getValue();
+        final Instant timestamp = windowedValue.getTimestamp();
+        Collection<W> windows = windowFn.assignWindows(windowFn.new AssignContext() {
+
+          @Override public T element() {
+            return element;
+          }
+
+          @Override public Instant timestamp() {
+            return timestamp;
+          }
+
+          @Override public BoundedWindow window() {
+            return boundedWindow;
+          }
+        });
+        return WindowedValue.of(element, timestamp, windows, windowedValue.getPane());
+      }
+    };
+  }
 }
+