You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/06/26 15:22:39 UTC

[beam] 01/07: Update KVHelpers.extractKey() to deal with WindowedValue and update GBK and CPK

This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 8a4372de576c7ac122b4383a941d406d3fb3b1ff
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Wed May 29 11:29:42 2019 +0200

    Update KVHelpers.extractKey() to deal with WindowedValue and update GBK and CPK
---
 .../translation/batch/CombinePerKeyTranslatorBatch.java          | 9 ++-------
 .../translation/batch/GroupByKeyTranslatorBatch.java             | 4 ++--
 .../spark/structuredstreaming/translation/helpers/KVHelpers.java | 5 +++--
 3 files changed, 7 insertions(+), 11 deletions(-)

diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java
index 3d0ee8b..c55219b 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java
@@ -52,13 +52,8 @@ class CombinePerKeyTranslatorBatch<K, InputT, AccumT, OutputT>
 
     Dataset<WindowedValue<KV<K, InputT>>> inputDataset = context.getDataset(input);
 
-    //TODO merge windows instead of doing unwindow/window to comply with beam model
-    Dataset<KV<K, InputT>> keyedDataset =
-        inputDataset.map(WindowingHelpers.unwindowMapFunction(), EncoderHelpers.kvEncoder());
-
-    // TODO change extractKey impl to deal with WindowedVAlue and use it in GBK
-    KeyValueGroupedDataset<K, KV<K, InputT>> groupedDataset =
-        keyedDataset.groupByKey(KVHelpers.extractKey(), EncoderHelpers.genericEncoder());
+    KeyValueGroupedDataset<K, WindowedValue<KV<K, InputT>>> groupedDataset =
+        inputDataset.groupByKey(KVHelpers.extractKey(), EncoderHelpers.genericEncoder());
 
     Dataset<Tuple2<K, OutputT>> combinedDataset =
         groupedDataset.agg(
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java
index b2b4441..148e643 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java
@@ -28,6 +28,7 @@ import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTr
 import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
 import org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.GroupAlsoByWindowViaOutputBufferFn;
 import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers;
+import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.KVHelpers;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -56,8 +57,7 @@ class GroupByKeyTranslatorBatch<K, V>
 
     //group by key only
     KeyValueGroupedDataset<K, WindowedValue<KV<K, V>>> groupByKeyOnly = input
-        .groupByKey((MapFunction<WindowedValue<KV<K, V>>, K>) wv -> wv.getValue().getKey(),
-            EncoderHelpers.genericEncoder());
+        .groupByKey(KVHelpers.extractKey(), EncoderHelpers.genericEncoder());
 
     // Materialize groupByKeyOnly values, potential OOM because of creation of new iterable
     Dataset<KV<K, Iterable<WindowedValue<V>>>> materialized = groupByKeyOnly.mapGroups(
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/KVHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/KVHelpers.java
index 3bea466..a53a93f 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/KVHelpers.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/KVHelpers.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.spark.structuredstreaming.translation.helpers;
 
+import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.apache.spark.api.java.function.MapFunction;
 import scala.Tuple2;
@@ -25,8 +26,8 @@ import scala.Tuple2;
 public final class KVHelpers {
 
   /** A Spark {@link MapFunction} for extracting the key out of a {@link KV} for GBK for example. */
-  public static <K, V> MapFunction<KV<K, V>, K> extractKey() {
-    return (MapFunction<KV<K, V>, K>) KV::getKey;
+  public static <K, V> MapFunction<WindowedValue<KV<K, V>>, K> extractKey() {
+    return (MapFunction<WindowedValue<KV<K, V>>, K>) wv -> wv.getValue().getKey();
   }
 
   /** A Spark {@link MapFunction} for making a KV out of a {@link scala.Tuple2}. */