You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/06/26 15:22:44 UTC

[beam] 06/07: Fix encoder in combine call

This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 9a269eff5fe67e4542643c64e392fec003fb28f7
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Mon Jun 17 11:53:37 2019 +0200

    Fix encoder in combine call
---
 .../translation/batch/CombineGloballyTranslatorBatch.java               | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineGloballyTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineGloballyTranslatorBatch.java
index fb9e1dd..f29b2c5 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineGloballyTranslatorBatch.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineGloballyTranslatorBatch.java
@@ -59,7 +59,7 @@ class CombineGloballyTranslatorBatch<InputT, AccumT, OutputT>
 
     Dataset<Iterable<WindowedValue<OutputT>>> accumulatedDataset =
         combinedRowDataset.map(
-            RowHelpers.extractObjectFromRowMapFunction(), EncoderHelpers.windowedValueEncoder());
+            RowHelpers.extractObjectFromRowMapFunction(), EncoderHelpers.genericEncoder());
     Dataset<WindowedValue<OutputT>> outputDataset = accumulatedDataset.flatMap(
         (FlatMapFunction<Iterable<WindowedValue<OutputT>>, WindowedValue<OutputT>>)
             windowedValues -> windowedValues.iterator(), EncoderHelpers.windowedValueEncoder());