You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/10/23 12:12:19 UTC
[beam] 02/05: Use beam encoders also in the output of the source
translation
This is an automated email from the ASF dual-hosted git repository.
echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git
commit ad29daf87ca7a8ed8fc16ce3072d7cc7804b1867
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Wed Oct 23 11:45:39 2019 +0200
Use beam encoders also in the output of the source translation
---
.../translation/batch/ReadSourceTranslatorBatch.java | 4 +---
.../translation/streaming/ReadSourceTranslatorStreaming.java | 4 +---
2 files changed, 2 insertions(+), 6 deletions(-)
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
index ceb87cf..6af7f55 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
@@ -77,9 +77,7 @@ class ReadSourceTranslatorBatch<T>
Dataset<WindowedValue<T>> dataset =
rowDataset.map(
RowHelpers.extractWindowedValueFromRowMapFunction(windowedValueCoder),
- // using kryo bytes serialization because the mapper already calls
- // windowedValueCoder.decode, no need to call it also in the Spark encoder
- EncoderHelpers.windowedValueEncoder());
+ EncoderHelpers.fromBeamCoder(windowedValueCoder));
PCollection<T> output = (PCollection<T>) context.getOutput();
context.putDataset(output, dataset);
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java
index 9f1e34d..ea10272 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java
@@ -77,9 +77,7 @@ class ReadSourceTranslatorStreaming<T>
Dataset<WindowedValue<T>> dataset =
rowDataset.map(
RowHelpers.extractWindowedValueFromRowMapFunction(windowedValueCoder),
- // using kryo bytes serialization because the mapper already calls
- // windowedValueCoder.decode, no need to call it also in the Spark encoder
- EncoderHelpers.windowedValueEncoder());
+ EncoderHelpers.fromBeamCoder(windowedValueCoder));
PCollection<T> output = (PCollection<T>) context.getOutput();
context.putDataset(output, dataset);