You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/01/24 03:23:19 UTC
[07/12] beam git commit: return encoded key for GroupByKey translation
return encoded key for GroupByKey translation
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/364a3f08
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/364a3f08
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/364a3f08
Branch: refs/heads/gearpump-runner
Commit: 364a3f089747ff4761cb5b54c963c8a8013574a0
Parents: f6aaf0d
Author: manuzhang <ow...@gmail.com>
Authored: Mon Jan 16 11:16:05 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Mon Jan 16 11:16:05 2017 +0800
----------------------------------------------------------------------
.../translators/GroupByKeyTranslator.java | 24 ++++++++++++++++----
1 file changed, 20 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/364a3f08/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
index e16a178..ac8e218 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
@@ -22,17 +22,22 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import java.io.Serializable;
+import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import org.apache.beam.runners.gearpump.translators.utils.TranslatorUtils;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
@@ -56,6 +61,7 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
@Override
public void translate(GroupByKey<K, V> transform, TranslationContext context) {
PCollection<KV<K, V>> input = context.getInput(transform);
+ Coder<K> inputKeyCoder = ((KvCoder<K, V>) input.getCoder()).getKeyCoder();
JavaStream<WindowedValue<KV<K, V>>> inputStream =
context.getInputStream(input);
int parallelism = context.getPipelineOptions().getParallelism();
@@ -64,7 +70,7 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
JavaStream<WindowedValue<KV<K, Iterable<V>>>> outputStream = inputStream
.window(Window.apply(new GearpumpWindowFn(input.getWindowingStrategy().getWindowFn()),
EventTimeTrigger$.MODULE$, Discarding$.MODULE$), "assign_window")
- .groupBy(new GroupByFn<K, V>(), parallelism, "group_by_Key_and_Window")
+ .groupBy(new GroupByFn<K, V>(inputKeyCoder), parallelism, "group_by_Key_and_Window")
.map(new ValueToIterable<K, V>(), "map_value_to_iterable")
.map(new KeyedByTimestamp<K, V>(), "keyed_by_timestamp")
.reduce(new Merge<K, V>(outputTimeFn), "merge")
@@ -128,11 +134,21 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
}
private static class GroupByFn<K, V> extends
- GroupByFunction<WindowedValue<KV<K, V>>, K> {
+ GroupByFunction<WindowedValue<KV<K, V>>, ByteBuffer> {
+
+ private final Coder<K> keyCoder;
+
+ GroupByFn(Coder<K> keyCoder) {
+ this.keyCoder = keyCoder;
+ }
@Override
- public K apply(WindowedValue<KV<K, V>> wv) {
- return wv.getValue().getKey();
+ public ByteBuffer apply(WindowedValue<KV<K, V>> wv) {
+ try {
+ return ByteBuffer.wrap(CoderUtils.encodeToByteArray(keyCoder, wv.getValue().getKey()));
+ } catch (CoderException e) {
+ throw new RuntimeException(e);
+ }
}
}