You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/05/28 14:58:10 UTC
[beam] branch spark-runner_structured-streaming updated: Add TODO
in Combine translations
This is an automated email from the ASF dual-hosted git repository.
echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/spark-runner_structured-streaming by this push:
new 8cdd143 Add TODO in Combine translations
8cdd143 is described below
commit 8cdd143ba3a4f70763c6f2a2fff6fc2269c31536
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Tue May 28 16:57:47 2019 +0200
Add TODO in Combine translations
---
.../translation/batch/CombineGloballyTranslatorBatch.java | 1 +
.../translation/batch/CombinePerKeyTranslatorBatch.java | 2 ++
2 files changed, 3 insertions(+)
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineGloballyTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineGloballyTranslatorBatch.java
index fd66002..53651cf 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineGloballyTranslatorBatch.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineGloballyTranslatorBatch.java
@@ -53,6 +53,7 @@ class CombineGloballyTranslatorBatch<InputT, AccumT, OutputT>
Dataset<WindowedValue<InputT>> inputDataset = context.getDataset(input);
+ //TODO merge windows instead of doing unwindow/window to comply with beam model
Dataset<InputT> unWindowedDataset =
inputDataset.map(WindowingHelpers.unwindowMapFunction(), EncoderHelpers.genericEncoder());
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java
index 5ff0048..3d0ee8b 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java
@@ -52,9 +52,11 @@ class CombinePerKeyTranslatorBatch<K, InputT, AccumT, OutputT>
Dataset<WindowedValue<KV<K, InputT>>> inputDataset = context.getDataset(input);
+ //TODO merge windows instead of doing unwindow/window to comply with beam model
Dataset<KV<K, InputT>> keyedDataset =
inputDataset.map(WindowingHelpers.unwindowMapFunction(), EncoderHelpers.kvEncoder());
+ // TODO change extractKey impl to deal with WindowedVAlue and use it in GBK
KeyValueGroupedDataset<K, KV<K, InputT>> groupedDataset =
keyedDataset.groupByKey(KVHelpers.extractKey(), EncoderHelpers.genericEncoder());