You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/09/27 09:13:59 UTC
[beam] 01/02: Improve performance of source: the mapper already
calls windowedValueCoder.decode,
no need to call it also in the Spark encoder
This is an automated email from the ASF dual-hosted git repository.
echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git
commit 3ac3c717ad11248211fa0e2a0b077b1ea2602287
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Thu Sep 19 17:20:31 2019 +0200
Improve performance of source: the mapper already calls windowedValueCoder.decode, no need to call it also in the Spark encoder
---
.../translation/batch/ReadSourceTranslatorBatch.java | 9 ++++++---
.../translation/streaming/ReadSourceTranslatorStreaming.java | 9 ++++++---
2 files changed, 12 insertions(+), 6 deletions(-)
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
index 2dcf66f..ceb87cf 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
@@ -70,13 +70,16 @@ class ReadSourceTranslatorBatch<T>
.load();
// extract windowedValue from Row
- WindowedValue.FullWindowedValueCoder<T> windowedValueCoder = WindowedValue.FullWindowedValueCoder
- .of(source.getOutputCoder(), GlobalWindow.Coder.INSTANCE);
+ WindowedValue.FullWindowedValueCoder<T> windowedValueCoder =
+ WindowedValue.FullWindowedValueCoder.of(
+ source.getOutputCoder(), GlobalWindow.Coder.INSTANCE);
Dataset<WindowedValue<T>> dataset =
rowDataset.map(
RowHelpers.extractWindowedValueFromRowMapFunction(windowedValueCoder),
- EncoderHelpers.fromBeamCoder(windowedValueCoder));
+ // using kryo bytes serialization because the mapper already calls
+ // windowedValueCoder.decode, no need to call it also in the Spark encoder
+ EncoderHelpers.windowedValueEncoder());
PCollection<T> output = (PCollection<T>) context.getOutput();
context.putDataset(output, dataset);
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java
index 9e03d96..9f1e34d 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java
@@ -71,12 +71,15 @@ class ReadSourceTranslatorStreaming<T>
.load();
// extract windowedValue from Row
- WindowedValue.FullWindowedValueCoder<T> windowedValueCoder = WindowedValue.FullWindowedValueCoder
- .of(source.getOutputCoder(), GlobalWindow.Coder.INSTANCE);
+ WindowedValue.FullWindowedValueCoder<T> windowedValueCoder =
+ WindowedValue.FullWindowedValueCoder.of(
+ source.getOutputCoder(), GlobalWindow.Coder.INSTANCE);
Dataset<WindowedValue<T>> dataset =
rowDataset.map(
RowHelpers.extractWindowedValueFromRowMapFunction(windowedValueCoder),
- EncoderHelpers.fromBeamCoder(windowedValueCoder));
+ // using kryo bytes serialization because the mapper already calls
+ // windowedValueCoder.decode, no need to call it also in the Spark encoder
+ EncoderHelpers.windowedValueEncoder());
PCollection<T> output = (PCollection<T>) context.getOutput();
context.putDataset(output, dataset);