You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/10/10 15:48:59 UTC
[beam] 02/04: Apply new Encoders to GroupByKey
This is an automated email from the ASF dual-hosted git repository.
echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git
commit 868204f2d2de27ab7f37e4630a0b52a60092b766
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Mon Sep 30 12:13:25 2019 +0200
Apply new Encoders to GroupByKey
---
.../batch/GroupByKeyTranslatorBatch.java | 25 ++++++++++++++++------
1 file changed, 19 insertions(+), 6 deletions(-)
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java
index 3e203a8..2970aa7 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java
@@ -29,6 +29,8 @@ import org.apache.beam.runners.spark.structuredstreaming.translation.Translation
import org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.GroupAlsoByWindowViaOutputBufferFn;
import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers;
import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.KVHelpers;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.util.WindowedValue;
@@ -54,11 +56,21 @@ class GroupByKeyTranslatorBatch<K, V>
Dataset<WindowedValue<KV<K, V>>> input = context.getDataset(inputPCollection);
+ WindowingStrategy<?, ?> windowingStrategy = inputPCollection.getWindowingStrategy();
+ KvCoder<K, V> kvCoder = (KvCoder<K, V>) inputPCollection.getCoder();
+
// group by key only
+ Coder<K> keyCoder = kvCoder.getKeyCoder();
KeyValueGroupedDataset<K, WindowedValue<KV<K, V>>> groupByKeyOnly =
- input.groupByKey(KVHelpers.extractKey(), EncoderHelpers.genericEncoder());
+ input.groupByKey(KVHelpers.extractKey(), EncoderHelpers.fromBeamCoder(
+ keyCoder));
// Materialize groupByKeyOnly values, potential OOM because of creation of new iterable
+ Coder<V> valueCoder = kvCoder.getValueCoder();
+ WindowedValue.WindowedValueCoder<V> wvCoder =
+ WindowedValue.FullWindowedValueCoder.of(
+ valueCoder, inputPCollection.getWindowingStrategy().getWindowFn().windowCoder());
+ IterableCoder<WindowedValue<V>> iterableCoder = IterableCoder.of(wvCoder);
Dataset<KV<K, Iterable<WindowedValue<V>>>> materialized =
groupByKeyOnly.mapGroups(
(MapGroupsFunction<K, WindowedValue<KV<K, V>>, KV<K, Iterable<WindowedValue<V>>>>)
@@ -77,19 +89,20 @@ class GroupByKeyTranslatorBatch<K, V>
KV.of(key, Iterables.unmodifiableIterable(values));
return kv;
},
- EncoderHelpers.kvEncoder());
+ EncoderHelpers.fromBeamCoder(KvCoder.of(keyCoder, iterableCoder)));
- WindowingStrategy<?, ?> windowingStrategy = inputPCollection.getWindowingStrategy();
- KvCoder<K, V> coder = (KvCoder<K, V>) inputPCollection.getCoder();
// group also by windows
+ WindowedValue.FullWindowedValueCoder<KV<K, Iterable<V>>> outputCoder = WindowedValue.FullWindowedValueCoder
+ .of(KvCoder.of(keyCoder, IterableCoder.of(valueCoder)),
+ windowingStrategy.getWindowFn().windowCoder());
Dataset<WindowedValue<KV<K, Iterable<V>>>> output =
materialized.flatMap(
new GroupAlsoByWindowViaOutputBufferFn<>(
windowingStrategy,
new InMemoryStateInternalsFactory<>(),
- SystemReduceFn.buffering(coder.getValueCoder()),
+ SystemReduceFn.buffering(valueCoder),
context.getSerializableOptions()),
- EncoderHelpers.windowedValueEncoder());
+ EncoderHelpers.fromBeamCoder(outputCoder));
context.putDataset(context.getOutput(), output);
}