You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/09/27 10:00:49 UTC

[beam] 01/03: Apply new Encoders to Window assign translation

This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 5beb435f46ea82ed0380e11e7751bdb6fbbbcee4
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Fri Sep 27 11:22:15 2019 +0200

    Apply new Encoders to Window assign translation
---
 .../translation/batch/WindowAssignTranslatorBatch.java            | 8 ++++++--
 1 file changed, 6 insertions(+), 2 deletions(-)

diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java
index fb37f97..576b914 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java
@@ -23,6 +23,7 @@ import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.Enc
 import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.WindowingHelpers;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.spark.sql.Dataset;
@@ -44,10 +45,13 @@ class WindowAssignTranslatorBatch<T>
     if (WindowingHelpers.skipAssignWindows(assignTransform, context)) {
       context.putDataset(output, inputDataset);
     } else {
+      WindowFn<T, ?> windowFn = assignTransform.getWindowFn();
+      WindowedValue.FullWindowedValueCoder<T> windoweVdalueCoder = WindowedValue.FullWindowedValueCoder
+          .of(input.getCoder(), windowFn.windowCoder());
       Dataset<WindowedValue<T>> outputDataset =
           inputDataset.map(
-              WindowingHelpers.assignWindowsMapFunction(assignTransform.getWindowFn()),
-              EncoderHelpers.windowedValueEncoder());
+              WindowingHelpers.assignWindowsMapFunction(windowFn),
+              EncoderHelpers.fromBeamCoder(windoweVdalueCoder));
       context.putDataset(output, outputDataset);
     }
   }