You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dm...@apache.org on 2019/03/20 21:26:30 UTC
[beam] branch master updated: Create a custom hash paritioner that
deals with arrays during combines when used in spark
This is an automated email from the ASF dual-hosted git repository.
dmvk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 18820bb Create a custom hash paritioner that deals with arrays during combines when used in spark
new 32bc6da Merge pull request #8042: [BEAM-6812]: Convert keys to ByteArray in Combine.perKey to make sure hashCode is consistent
18820bb is described below
commit 18820bb49d123030a6ba2712692c2b2bb51dac6a
Author: Ankit Jhalaria <aj...@godaddy.com>
AuthorDate: Tue Mar 12 14:16:45 2019 -0700
Create a custom hash paritioner that deals with arrays during combines when used in spark
---
.../runners/spark/translation/GroupCombineFunctions.java | 12 ++++++++----
.../beam/runners/spark/translation/TranslationUtils.java | 10 +++++++---
2 files changed, 15 insertions(+), 7 deletions(-)
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
index 95ff95a..0ec217d 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
@@ -143,10 +143,10 @@ public class GroupCombineFunctions {
// Once Spark provides a way to include keys in the arguments of combine/merge functions,
// we won't need to duplicate the keys anymore.
// Key has to bw windowed in order to group by window as well.
- JavaPairRDD<K, WindowedValue<KV<K, InputT>>> inRddDuplicatedKeyPair =
- rdd.mapToPair(TranslationUtils.toPairByKeyInWindowedValue());
+ JavaPairRDD<ByteArray, WindowedValue<KV<K, InputT>>> inRddDuplicatedKeyPair =
+ rdd.mapToPair(TranslationUtils.toPairByKeyInWindowedValue(keyCoder));
- JavaPairRDD<K, SerializableAccumulator<KV<K, AccumT>>> accumulatedResult =
+ JavaPairRDD<ByteArray, SerializableAccumulator<KV<K, AccumT>>> accumulatedResult =
inRddDuplicatedKeyPair.combineByKey(
input ->
SerializableAccumulator.of(sparkCombineFn.createCombiner(input), iterAccumCoder),
@@ -160,7 +160,11 @@ public class GroupCombineFunctions {
acc1.getOrDecode(iterAccumCoder), acc2.getOrDecode(iterAccumCoder)),
iterAccumCoder));
- return accumulatedResult.mapToPair(i -> new Tuple2<>(i._1, i._2.getOrDecode(iterAccumCoder)));
+ return accumulatedResult.mapToPair(
+ i ->
+ new Tuple2<>(
+ CoderHelpers.fromByteArray(i._1.getValue(), keyCoder),
+ i._2.getOrDecode(iterAccumCoder)));
}
/** An implementation of {@link Reshuffle} for the Spark runner. */
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
index 8186a87..35ac89a 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
@@ -27,6 +27,7 @@ import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StateInternalsFactory;
import org.apache.beam.runners.spark.SparkRunner;
import org.apache.beam.runners.spark.coders.CoderHelpers;
+import org.apache.beam.runners.spark.util.ByteArray;
import org.apache.beam.runners.spark.util.SideInputBroadcast;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.DoFn;
@@ -154,9 +155,12 @@ public final class TranslationUtils {
/** Extract key from a {@link WindowedValue} {@link KV} into a pair. */
public static <K, V>
- PairFunction<WindowedValue<KV<K, V>>, K, WindowedValue<KV<K, V>>>
- toPairByKeyInWindowedValue() {
- return windowedKv -> new Tuple2<>(windowedKv.getValue().getKey(), windowedKv);
+ PairFunction<WindowedValue<KV<K, V>>, ByteArray, WindowedValue<KV<K, V>>>
+ toPairByKeyInWindowedValue(final Coder<K> keyCoder) {
+ return windowedKv ->
+ new Tuple2<>(
+ new ByteArray(CoderHelpers.toByteArray(windowedKv.getValue().getKey(), keyCoder)),
+ windowedKv);
}
/** Extract window from a {@link KV} with {@link WindowedValue} value. */