You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/09/27 10:00:49 UTC
[beam] 01/03: Apply new Encoders to Window assign translation
This is an automated email from the ASF dual-hosted git repository.
echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git
commit 5beb435f46ea82ed0380e11e7751bdb6fbbbcee4
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Fri Sep 27 11:22:15 2019 +0200
Apply new Encoders to Window assign translation
---
.../translation/batch/WindowAssignTranslatorBatch.java | 8 ++++++--
1 file changed, 6 insertions(+), 2 deletions(-)
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java
index fb37f97..576b914 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java
@@ -23,6 +23,7 @@ import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.Enc
import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.WindowingHelpers;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
import org.apache.spark.sql.Dataset;
@@ -44,10 +45,13 @@ class WindowAssignTranslatorBatch<T>
if (WindowingHelpers.skipAssignWindows(assignTransform, context)) {
context.putDataset(output, inputDataset);
} else {
+ WindowFn<T, ?> windowFn = assignTransform.getWindowFn();
+ WindowedValue.FullWindowedValueCoder<T> windoweVdalueCoder = WindowedValue.FullWindowedValueCoder
+ .of(input.getCoder(), windowFn.windowCoder());
Dataset<WindowedValue<T>> outputDataset =
inputDataset.map(
- WindowingHelpers.assignWindowsMapFunction(assignTransform.getWindowFn()),
- EncoderHelpers.windowedValueEncoder());
+ WindowingHelpers.assignWindowsMapFunction(windowFn),
+ EncoderHelpers.fromBeamCoder(windoweVdalueCoder));
context.putDataset(output, outputDataset);
}
}