You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/06/26 15:22:45 UTC

[beam] 07/07: [to remove] temporary: revert extractKey while combinePerKey is not done (so that it compiles)

This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit f0522dc67be2ba2a6e2e1a4dcbfbc2343455ac0a
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Mon Jun 17 13:59:13 2019 +0200

    [to remove] temporary: revert extractKey while combinePerKey is not done (so that it compiles)
---
 .../translation/batch/CombinePerKeyTranslatorBatch.java            | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)

diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java
index c55219b..1c35301 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java
@@ -28,6 +28,7 @@ import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.spark.api.java.function.MapFunction;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.KeyValueGroupedDataset;
 import scala.Tuple2;
@@ -52,8 +53,10 @@ class CombinePerKeyTranslatorBatch<K, InputT, AccumT, OutputT>
 
     Dataset<WindowedValue<KV<K, InputT>>> inputDataset = context.getDataset(input);
 
-    KeyValueGroupedDataset<K, WindowedValue<KV<K, InputT>>> groupedDataset =
-        inputDataset.groupByKey(KVHelpers.extractKey(), EncoderHelpers.genericEncoder());
+    Dataset<KV<K, InputT>> unwindowedDataset = inputDataset
+        .map(WindowingHelpers.unwindowMapFunction(), EncoderHelpers.kvEncoder());
+    KeyValueGroupedDataset<K, KV<K, InputT>> groupedDataset =
+        unwindowedDataset.groupByKey((MapFunction<KV<K, InputT>, K>) kv -> kv.getKey(), EncoderHelpers.genericEncoder());
 
     Dataset<Tuple2<K, OutputT>> combinedDataset =
         groupedDataset.agg(