You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dm...@apache.org on 2019/03/20 21:26:30 UTC

[beam] branch master updated: Create a custom hash paritioner that deals with arrays during combines when used in spark

This is an automated email from the ASF dual-hosted git repository.

dmvk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 18820bb  Create a custom hash paritioner that deals with arrays during combines when used in spark
     new 32bc6da  Merge pull request #8042: [BEAM-6812]: Convert keys to ByteArray in Combine.perKey to make sure hashCode is consistent
18820bb is described below

commit 18820bb49d123030a6ba2712692c2b2bb51dac6a
Author: Ankit Jhalaria <aj...@godaddy.com>
AuthorDate: Tue Mar 12 14:16:45 2019 -0700

    Create a custom hash paritioner that deals with arrays during combines when used in spark
---
 .../runners/spark/translation/GroupCombineFunctions.java     | 12 ++++++++----
 .../beam/runners/spark/translation/TranslationUtils.java     | 10 +++++++---
 2 files changed, 15 insertions(+), 7 deletions(-)

diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
index 95ff95a..0ec217d 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
@@ -143,10 +143,10 @@ public class GroupCombineFunctions {
     // Once Spark provides a way to include keys in the arguments of combine/merge functions,
     // we won't need to duplicate the keys anymore.
     // Key has to bw windowed in order to group by window as well.
-    JavaPairRDD<K, WindowedValue<KV<K, InputT>>> inRddDuplicatedKeyPair =
-        rdd.mapToPair(TranslationUtils.toPairByKeyInWindowedValue());
+    JavaPairRDD<ByteArray, WindowedValue<KV<K, InputT>>> inRddDuplicatedKeyPair =
+        rdd.mapToPair(TranslationUtils.toPairByKeyInWindowedValue(keyCoder));
 
-    JavaPairRDD<K, SerializableAccumulator<KV<K, AccumT>>> accumulatedResult =
+    JavaPairRDD<ByteArray, SerializableAccumulator<KV<K, AccumT>>> accumulatedResult =
         inRddDuplicatedKeyPair.combineByKey(
             input ->
                 SerializableAccumulator.of(sparkCombineFn.createCombiner(input), iterAccumCoder),
@@ -160,7 +160,11 @@ public class GroupCombineFunctions {
                         acc1.getOrDecode(iterAccumCoder), acc2.getOrDecode(iterAccumCoder)),
                     iterAccumCoder));
 
-    return accumulatedResult.mapToPair(i -> new Tuple2<>(i._1, i._2.getOrDecode(iterAccumCoder)));
+    return accumulatedResult.mapToPair(
+        i ->
+            new Tuple2<>(
+                CoderHelpers.fromByteArray(i._1.getValue(), keyCoder),
+                i._2.getOrDecode(iterAccumCoder)));
   }
 
   /** An implementation of {@link Reshuffle} for the Spark runner. */
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
index 8186a87..35ac89a 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
@@ -27,6 +27,7 @@ import org.apache.beam.runners.core.StateInternals;
 import org.apache.beam.runners.core.StateInternalsFactory;
 import org.apache.beam.runners.spark.SparkRunner;
 import org.apache.beam.runners.spark.coders.CoderHelpers;
+import org.apache.beam.runners.spark.util.ByteArray;
 import org.apache.beam.runners.spark.util.SideInputBroadcast;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -154,9 +155,12 @@ public final class TranslationUtils {
 
   /** Extract key from a {@link WindowedValue} {@link KV} into a pair. */
   public static <K, V>
-      PairFunction<WindowedValue<KV<K, V>>, K, WindowedValue<KV<K, V>>>
-          toPairByKeyInWindowedValue() {
-    return windowedKv -> new Tuple2<>(windowedKv.getValue().getKey(), windowedKv);
+      PairFunction<WindowedValue<KV<K, V>>, ByteArray, WindowedValue<KV<K, V>>>
+          toPairByKeyInWindowedValue(final Coder<K> keyCoder) {
+    return windowedKv ->
+        new Tuple2<>(
+            new ByteArray(CoderHelpers.toByteArray(windowedKv.getValue().getKey(), keyCoder)),
+            windowedKv);
   }
 
   /** Extract window from a {@link KV} with {@link WindowedValue} value. */