You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/06/26 15:22:39 UTC
[beam] 01/07: Update KVHelpers.extractKey() to deal with
WindowedValue and update GBK and CPK
This is an automated email from the ASF dual-hosted git repository.
echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git
commit 8a4372de576c7ac122b4383a941d406d3fb3b1ff
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Wed May 29 11:29:42 2019 +0200
Update KVHelpers.extractKey() to deal with WindowedValue and update GBK and CPK
---
.../translation/batch/CombinePerKeyTranslatorBatch.java | 9 ++-------
.../translation/batch/GroupByKeyTranslatorBatch.java | 4 ++--
.../spark/structuredstreaming/translation/helpers/KVHelpers.java | 5 +++--
3 files changed, 7 insertions(+), 11 deletions(-)
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java
index 3d0ee8b..c55219b 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java
@@ -52,13 +52,8 @@ class CombinePerKeyTranslatorBatch<K, InputT, AccumT, OutputT>
Dataset<WindowedValue<KV<K, InputT>>> inputDataset = context.getDataset(input);
- //TODO merge windows instead of doing unwindow/window to comply with beam model
- Dataset<KV<K, InputT>> keyedDataset =
- inputDataset.map(WindowingHelpers.unwindowMapFunction(), EncoderHelpers.kvEncoder());
-
- // TODO change extractKey impl to deal with WindowedVAlue and use it in GBK
- KeyValueGroupedDataset<K, KV<K, InputT>> groupedDataset =
- keyedDataset.groupByKey(KVHelpers.extractKey(), EncoderHelpers.genericEncoder());
+ KeyValueGroupedDataset<K, WindowedValue<KV<K, InputT>>> groupedDataset =
+ inputDataset.groupByKey(KVHelpers.extractKey(), EncoderHelpers.genericEncoder());
Dataset<Tuple2<K, OutputT>> combinedDataset =
groupedDataset.agg(
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java
index b2b4441..148e643 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java
@@ -28,6 +28,7 @@ import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTr
import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
import org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.GroupAlsoByWindowViaOutputBufferFn;
import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers;
+import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.KVHelpers;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.util.WindowedValue;
@@ -56,8 +57,7 @@ class GroupByKeyTranslatorBatch<K, V>
//group by key only
KeyValueGroupedDataset<K, WindowedValue<KV<K, V>>> groupByKeyOnly = input
- .groupByKey((MapFunction<WindowedValue<KV<K, V>>, K>) wv -> wv.getValue().getKey(),
- EncoderHelpers.genericEncoder());
+ .groupByKey(KVHelpers.extractKey(), EncoderHelpers.genericEncoder());
// Materialize groupByKeyOnly values, potential OOM because of creation of new iterable
Dataset<KV<K, Iterable<WindowedValue<V>>>> materialized = groupByKeyOnly.mapGroups(
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/KVHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/KVHelpers.java
index 3bea466..a53a93f 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/KVHelpers.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/KVHelpers.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.runners.spark.structuredstreaming.translation.helpers;
+import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.spark.api.java.function.MapFunction;
import scala.Tuple2;
@@ -25,8 +26,8 @@ import scala.Tuple2;
public final class KVHelpers {
/** A Spark {@link MapFunction} for extracting the key out of a {@link KV} for GBK for example. */
- public static <K, V> MapFunction<KV<K, V>, K> extractKey() {
- return (MapFunction<KV<K, V>, K>) KV::getKey;
+ public static <K, V> MapFunction<WindowedValue<KV<K, V>>, K> extractKey() {
+ return (MapFunction<WindowedValue<KV<K, V>>, K>) wv -> wv.getValue().getKey();
}
/** A Spark {@link MapFunction} for making a KV out of a {@link scala.Tuple2}. */