You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/01/24 03:23:19 UTC

[07/12] beam git commit: return encoded key for GroupByKey translation

return encoded key for GroupByKey translation


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/364a3f08
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/364a3f08
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/364a3f08

Branch: refs/heads/gearpump-runner
Commit: 364a3f089747ff4761cb5b54c963c8a8013574a0
Parents: f6aaf0d
Author: manuzhang <ow...@gmail.com>
Authored: Mon Jan 16 11:16:05 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Mon Jan 16 11:16:05 2017 +0800

----------------------------------------------------------------------
 .../translators/GroupByKeyTranslator.java       | 24 ++++++++++++++++----
 1 file changed, 20 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/364a3f08/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
index e16a178..ac8e218 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
@@ -22,17 +22,22 @@ import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 
 import java.io.Serializable;
+import java.nio.ByteBuffer;
 import java.time.Instant;
 import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
 
 import org.apache.beam.runners.gearpump.translators.utils.TranslatorUtils;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 
@@ -56,6 +61,7 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
   @Override
   public void translate(GroupByKey<K, V> transform, TranslationContext context) {
     PCollection<KV<K, V>> input = context.getInput(transform);
+    Coder<K> inputKeyCoder = ((KvCoder<K, V>) input.getCoder()).getKeyCoder();
     JavaStream<WindowedValue<KV<K, V>>> inputStream =
         context.getInputStream(input);
     int parallelism = context.getPipelineOptions().getParallelism();
@@ -64,7 +70,7 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
     JavaStream<WindowedValue<KV<K, Iterable<V>>>> outputStream = inputStream
         .window(Window.apply(new GearpumpWindowFn(input.getWindowingStrategy().getWindowFn()),
             EventTimeTrigger$.MODULE$, Discarding$.MODULE$), "assign_window")
-        .groupBy(new GroupByFn<K, V>(), parallelism, "group_by_Key_and_Window")
+        .groupBy(new GroupByFn<K, V>(inputKeyCoder), parallelism, "group_by_Key_and_Window")
         .map(new ValueToIterable<K, V>(), "map_value_to_iterable")
         .map(new KeyedByTimestamp<K, V>(), "keyed_by_timestamp")
         .reduce(new Merge<K, V>(outputTimeFn), "merge")
@@ -128,11 +134,21 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
   }
 
   private static class GroupByFn<K, V> extends
-      GroupByFunction<WindowedValue<KV<K, V>>, K> {
+      GroupByFunction<WindowedValue<KV<K, V>>, ByteBuffer> {
+
+    private final Coder<K> keyCoder;
+
+    GroupByFn(Coder<K> keyCoder) {
+      this.keyCoder = keyCoder;
+    }
 
     @Override
-    public K apply(WindowedValue<KV<K, V>> wv) {
-      return wv.getValue().getKey();
+    public ByteBuffer apply(WindowedValue<KV<K, V>> wv) {
+      try {
+        return ByteBuffer.wrap(CoderUtils.encodeToByteArray(keyCoder, wv.getValue().getKey()));
+      } catch (CoderException e) {
+        throw new RuntimeException(e);
+      }
     }
   }