You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/03/01 15:53:53 UTC
[beam] 02/02: Implement WindowAssignTranslatorBatch
This is an automated email from the ASF dual-hosted git repository.
echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git
commit 010fcb664035eafb7f1db2e8bdc81c7f1260a003
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Fri Mar 1 16:51:36 2019 +0100
Implement WindowAssignTranslatorBatch
---
.../batch/WindowAssignTranslatorBatch.java | 24 ++++++++-
.../translation/helpers/WindowingHelpers.java | 62 ++++++++++++++++++++--
2 files changed, 81 insertions(+), 5 deletions(-)
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java
index 51e21c2..b27181a 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java
@@ -19,13 +19,35 @@ package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator;
import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
+import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers;
+import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.WindowingHelpers;
import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.spark.sql.Dataset;
class WindowAssignTranslatorBatch<T>
implements TransformTranslator<PTransform<PCollection<T>, PCollection<T>>> {
@Override
public void translateTransform(
- PTransform<PCollection<T>, PCollection<T>> transform, TranslationContext context) {}
+ PTransform<PCollection<T>, PCollection<T>> transform, TranslationContext context) {
+
+ Window.Assign<T> assignTransform = (Window.Assign<T>) transform;
+ @SuppressWarnings("unchecked")
+ final PCollection<T> input = (PCollection<T>) context.getInput();
+ @SuppressWarnings("unchecked")
+ final PCollection<T> output = (PCollection<T>) context.getOutput();
+
+ Dataset<WindowedValue<T>> inputDataset = context.getDataset(input);
+ if (WindowingHelpers.skipAssignWindows(assignTransform, context)) {
+ context.putDataset(output, inputDataset);
+ } else {
+ Dataset<WindowedValue<T>> outputDataset = inputDataset
+ .map(WindowingHelpers.assignWindowsMapFunction(assignTransform.getWindowFn()),
+ EncoderHelpers.windowedValueEncoder());
+ context.putDataset(output, outputDataset);
+ }
+ }
}
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/WindowingHelpers.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/WindowingHelpers.java
index 45b5153..8188782 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/WindowingHelpers.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/WindowingHelpers.java
@@ -17,8 +17,17 @@
*/
package org.apache.beam.runners.spark.structuredstreaming.translation.helpers;
+import com.google.common.collect.Iterables;
+import java.util.Collection;
+import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollection;
import org.apache.spark.api.java.function.MapFunction;
+import org.joda.time.Instant;
/** Helper functions for working with windows. */
public final class WindowingHelpers {
@@ -33,8 +42,8 @@ public final class WindowingHelpers {
*/
public static <T> MapFunction<T, WindowedValue<T>> windowMapFunction() {
return new MapFunction<T, WindowedValue<T>>() {
- @Override
- public WindowedValue<T> call(T t) {
+
+ @Override public WindowedValue<T> call(T t) {
return WindowedValue.valueInGlobalWindow(t);
}
};
@@ -48,10 +57,55 @@ public final class WindowingHelpers {
*/
public static <T> MapFunction<WindowedValue<T>, T> unwindowMapFunction() {
return new MapFunction<WindowedValue<T>, T>() {
- @Override
- public T call(WindowedValue<T> t) {
+
+ @Override public T call(WindowedValue<T> t) {
return t.getValue();
}
};
}
+
+ /**
+ * Checks if the window transformation should be applied or skipped.
+ *
+ * <p>Avoid running assign windows if both source and destination are global window or if the user
+ * has not specified the WindowFn (meaning they are just messing with triggering or allowed
+ * lateness).
+ *
+ */
+ @SuppressWarnings("unchecked") public static <T, W extends BoundedWindow> boolean skipAssignWindows(
+ Window.Assign<T> transform, TranslationContext context) {
+ WindowFn<? super T, W> windowFnToApply = (WindowFn<? super T, W>) transform.getWindowFn();
+ PCollection<T> input = (PCollection<T>) context.getInput();
+ WindowFn<?, ?> windowFnOfInput = input.getWindowingStrategy().getWindowFn();
+ return windowFnToApply == null || (windowFnOfInput instanceof GlobalWindows
+ && windowFnToApply instanceof GlobalWindows);
+ }
+
+ public static <T, W extends BoundedWindow> MapFunction<WindowedValue<T>, WindowedValue<T>> assignWindowsMapFunction(
+ WindowFn<T, W> windowFn) {
+ return new MapFunction<WindowedValue<T>, WindowedValue<T>>() {
+
+ @Override public WindowedValue<T> call(WindowedValue<T> windowedValue) throws Exception {
+ final BoundedWindow boundedWindow = Iterables.getOnlyElement(windowedValue.getWindows());
+ final T element = windowedValue.getValue();
+ final Instant timestamp = windowedValue.getTimestamp();
+ Collection<W> windows = windowFn.assignWindows(windowFn.new AssignContext() {
+
+ @Override public T element() {
+ return element;
+ }
+
+ @Override public Instant timestamp() {
+ return timestamp;
+ }
+
+ @Override public BoundedWindow window() {
+ return boundedWindow;
+ }
+ });
+ return WindowedValue.of(element, timestamp, windows, windowedValue.getPane());
+ }
+ };
+ }
}
+