You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/09/27 08:58:09 UTC

[beam] 01/03: Apply new Encoders to Read source

This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit bcbb69785106dba375414291eae956276e74b0fe
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Fri Sep 6 17:49:10 2019 +0200

    Apply new Encoders to Read source
---
 .../translation/batch/ReadSourceTranslatorBatch.java              | 8 ++++++--
 .../spark/structuredstreaming/translation/helpers/RowHelpers.java | 4 +---
 .../translation/streaming/ReadSourceTranslatorStreaming.java      | 7 +++++--
 3 files changed, 12 insertions(+), 7 deletions(-)

diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
index 6ae6646..2dcf66f 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
@@ -27,6 +27,7 @@ import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.Row
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
@@ -69,10 +70,13 @@ class ReadSourceTranslatorBatch<T>
             .load();
 
     // extract windowedValue from Row
+    WindowedValue.FullWindowedValueCoder<T> windowedValueCoder = WindowedValue.FullWindowedValueCoder
+        .of(source.getOutputCoder(), GlobalWindow.Coder.INSTANCE);
+
     Dataset<WindowedValue<T>> dataset =
         rowDataset.map(
-            RowHelpers.extractWindowedValueFromRowMapFunction(source.getOutputCoder()),
-            EncoderHelpers.windowedValueEncoder());
+            RowHelpers.extractWindowedValueFromRowMapFunction(windowedValueCoder),
+            EncoderHelpers.fromBeamCoder(windowedValueCoder));
 
     PCollection<T> output = (PCollection<T>) context.getOutput();
     context.putDataset(output, dataset);
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/RowHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/RowHelpers.java
index 6ee0e07..ac74c29 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/RowHelpers.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/RowHelpers.java
@@ -43,13 +43,11 @@ public final class RowHelpers {
    * @return A {@link MapFunction} that accepts a {@link Row} and returns its {@link WindowedValue}.
    */
   public static <T> MapFunction<Row, WindowedValue<T>> extractWindowedValueFromRowMapFunction(
-      Coder<T> coder) {
+      WindowedValue.WindowedValueCoder<T> windowedValueCoder) {
     return (MapFunction<Row, WindowedValue<T>>)
         value -> {
           // there is only one value put in each Row by the InputPartitionReader
           byte[] bytes = (byte[]) value.get(0);
-          WindowedValue.FullWindowedValueCoder<T> windowedValueCoder =
-              WindowedValue.FullWindowedValueCoder.of(coder, GlobalWindow.Coder.INSTANCE);
           return windowedValueCoder.decode(new ByteArrayInputStream(bytes));
         };
   }
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java
index c3d07ff..9e03d96 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java
@@ -27,6 +27,7 @@ import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.Row
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
@@ -70,10 +71,12 @@ class ReadSourceTranslatorStreaming<T>
             .load();
 
     // extract windowedValue from Row
+    WindowedValue.FullWindowedValueCoder<T> windowedValueCoder = WindowedValue.FullWindowedValueCoder
+        .of(source.getOutputCoder(), GlobalWindow.Coder.INSTANCE);
     Dataset<WindowedValue<T>> dataset =
         rowDataset.map(
-            RowHelpers.extractWindowedValueFromRowMapFunction(source.getOutputCoder()),
-            EncoderHelpers.windowedValueEncoder());
+            RowHelpers.extractWindowedValueFromRowMapFunction(windowedValueCoder),
+            EncoderHelpers.fromBeamCoder(windowedValueCoder));
 
     PCollection<T> output = (PCollection<T>) context.getOutput();
     context.putDataset(output, dataset);