You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/10/10 15:48:59 UTC

[beam] 02/04: Apply new Encoders to GroupByKey

This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 868204f2d2de27ab7f37e4630a0b52a60092b766
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Mon Sep 30 12:13:25 2019 +0200

    Apply new Encoders to GroupByKey
---
 .../batch/GroupByKeyTranslatorBatch.java           | 25 ++++++++++++++++------
 1 file changed, 19 insertions(+), 6 deletions(-)

diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java
index 3e203a8..2970aa7 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java
@@ -29,6 +29,8 @@ import org.apache.beam.runners.spark.structuredstreaming.translation.Translation
 import org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.GroupAlsoByWindowViaOutputBufferFn;
 import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers;
 import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.KVHelpers;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -54,11 +56,21 @@ class GroupByKeyTranslatorBatch<K, V>
 
     Dataset<WindowedValue<KV<K, V>>> input = context.getDataset(inputPCollection);
 
+    WindowingStrategy<?, ?> windowingStrategy = inputPCollection.getWindowingStrategy();
+    KvCoder<K, V> kvCoder = (KvCoder<K, V>) inputPCollection.getCoder();
+
     // group by key only
+    Coder<K> keyCoder = kvCoder.getKeyCoder();
     KeyValueGroupedDataset<K, WindowedValue<KV<K, V>>> groupByKeyOnly =
-        input.groupByKey(KVHelpers.extractKey(), EncoderHelpers.genericEncoder());
+        input.groupByKey(KVHelpers.extractKey(), EncoderHelpers.fromBeamCoder(
+            keyCoder));
 
     // Materialize groupByKeyOnly values, potential OOM because of creation of new iterable
+    Coder<V> valueCoder = kvCoder.getValueCoder();
+    WindowedValue.WindowedValueCoder<V> wvCoder =
+        WindowedValue.FullWindowedValueCoder.of(
+            valueCoder, inputPCollection.getWindowingStrategy().getWindowFn().windowCoder());
+    IterableCoder<WindowedValue<V>> iterableCoder = IterableCoder.of(wvCoder);
     Dataset<KV<K, Iterable<WindowedValue<V>>>> materialized =
         groupByKeyOnly.mapGroups(
             (MapGroupsFunction<K, WindowedValue<KV<K, V>>, KV<K, Iterable<WindowedValue<V>>>>)
@@ -77,19 +89,20 @@ class GroupByKeyTranslatorBatch<K, V>
                       KV.of(key, Iterables.unmodifiableIterable(values));
                   return kv;
                 },
-            EncoderHelpers.kvEncoder());
+            EncoderHelpers.fromBeamCoder(KvCoder.of(keyCoder, iterableCoder)));
 
-    WindowingStrategy<?, ?> windowingStrategy = inputPCollection.getWindowingStrategy();
-    KvCoder<K, V> coder = (KvCoder<K, V>) inputPCollection.getCoder();
     // group also by windows
+    WindowedValue.FullWindowedValueCoder<KV<K, Iterable<V>>> outputCoder = WindowedValue.FullWindowedValueCoder
+        .of(KvCoder.of(keyCoder, IterableCoder.of(valueCoder)),
+            windowingStrategy.getWindowFn().windowCoder());
     Dataset<WindowedValue<KV<K, Iterable<V>>>> output =
         materialized.flatMap(
             new GroupAlsoByWindowViaOutputBufferFn<>(
                 windowingStrategy,
                 new InMemoryStateInternalsFactory<>(),
-                SystemReduceFn.buffering(coder.getValueCoder()),
+                SystemReduceFn.buffering(valueCoder),
                 context.getSerializableOptions()),
-            EncoderHelpers.windowedValueEncoder());
+            EncoderHelpers.fromBeamCoder(outputCoder));
 
     context.putDataset(context.getOutput(), output);
   }