You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2016/12/14 19:30:10 UTC
[2/2] incubator-beam git commit: Implement GetDefaultOutputCoder in
DirectGroupByKey
Implement GetDefaultOutputCoder in DirectGroupByKey
This uses the standard Coder Inference path to set coders, rather than
explicitly setting the output coders for intermediate PCollections.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4cbccee8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4cbccee8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4cbccee8
Branch: refs/heads/master
Commit: 4cbccee8ee9a3b4235c6338fe49efc1f8a079812
Parents: 5a51ace
Author: Thomas Groh <tg...@google.com>
Authored: Mon Dec 12 13:55:49 2016 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Wed Dec 14 11:29:29 2016 -0800
----------------------------------------------------------------------
.../beam/runners/direct/DirectGroupByKey.java | 36 +++++++++++---------
1 file changed, 20 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4cbccee8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
index 405d913..6c10bd2 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
@@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkArgument;
import org.apache.beam.runners.core.KeyedWorkItem;
import org.apache.beam.runners.core.KeyedWorkItemCoder;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
@@ -46,9 +47,6 @@ class DirectGroupByKey<K, V>
@Override
public PCollection<KV<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
- @SuppressWarnings("unchecked")
- KvCoder<K, V> inputCoder = (KvCoder<K, V>) input.getCoder();
-
// This operation groups by the combination of key and window,
// merging windows as needed, using the windows assigned to the
// key/value input elements and the window merge operation of the
@@ -61,19 +59,11 @@ class DirectGroupByKey<K, V>
// By default, implement GroupByKey via a series of lower-level operations.
return input
.apply(new DirectGroupByKeyOnly<K, V>())
- .setCoder(
- KeyedWorkItemCoder.of(
- inputCoder.getKeyCoder(),
- inputCoder.getValueCoder(),
- inputWindowingStrategy.getWindowFn().windowCoder()))
// Group each key's values by window, merging windows as needed.
.apply(
"GroupAlsoByWindow",
- new DirectGroupAlsoByWindow<K, V>(inputWindowingStrategy, outputWindowingStrategy))
-
- .setCoder(
- KvCoder.of(inputCoder.getKeyCoder(), IterableCoder.of(inputCoder.getValueCoder())));
+ new DirectGroupAlsoByWindow<K, V>(inputWindowingStrategy, outputWindowingStrategy));
}
static final class DirectGroupByKeyOnly<K, V>
@@ -85,6 +75,16 @@ class DirectGroupByKey<K, V>
}
DirectGroupByKeyOnly() {}
+
+ @Override
+ protected Coder<?> getDefaultOutputCoder(
+ @SuppressWarnings("unused") PCollection<KV<K, V>> input)
+ throws CannotProvideCoderException {
+ return KeyedWorkItemCoder.of(
+ GroupByKey.getKeyCoder(input.getCoder()),
+ GroupByKey.getInputValueCoder(input.getCoder()),
+ input.getWindowingStrategy().getWindowFn().windowCoder());
+ }
}
static final class DirectGroupAlsoByWindow<K, V>
@@ -117,15 +117,19 @@ class DirectGroupByKey<K, V>
return kvCoder;
}
- public Coder<K> getKeyCoder(Coder<KeyedWorkItem<K, V>> inputCoder) {
- return getKeyedWorkItemCoder(inputCoder).getKeyCoder();
- }
-
public Coder<V> getValueCoder(Coder<KeyedWorkItem<K, V>> inputCoder) {
return getKeyedWorkItemCoder(inputCoder).getElementCoder();
}
@Override
+ protected Coder<?> getDefaultOutputCoder(
+ @SuppressWarnings("unused") PCollection<KeyedWorkItem<K, V>> input)
+ throws CannotProvideCoderException {
+ KeyedWorkItemCoder<K, V> inputCoder = getKeyedWorkItemCoder(input.getCoder());
+ return KvCoder.of(inputCoder.getKeyCoder(), IterableCoder.of(inputCoder.getElementCoder()));
+ }
+
+ @Override
public PCollection<KV<K, Iterable<V>>> expand(PCollection<KeyedWorkItem<K, V>> input) {
return PCollection.createPrimitiveOutputInternal(
input.getPipeline(), outputWindowingStrategy, input.isBounded());