You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2016/12/14 19:30:09 UTC

[1/2] incubator-beam git commit: This closes #1583

Repository: incubator-beam
Updated Branches:
  refs/heads/master 5a51ace8d -> fdf07318f


This closes #1583


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/fdf07318
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/fdf07318
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/fdf07318

Branch: refs/heads/master
Commit: fdf07318f6833bad4634b59e3676033288f0c4aa
Parents: 5a51ace 4cbccee
Author: Thomas Groh <tg...@google.com>
Authored: Wed Dec 14 11:29:29 2016 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Wed Dec 14 11:29:29 2016 -0800

----------------------------------------------------------------------
 .../beam/runners/direct/DirectGroupByKey.java   | 36 +++++++++++---------
 1 file changed, 20 insertions(+), 16 deletions(-)
----------------------------------------------------------------------



[2/2] incubator-beam git commit: Implement GetDefaultOutputCoder in DirectGroupByKey

Posted by tg...@apache.org.
Implement GetDefaultOutputCoder in DirectGroupByKey

This uses the standard Coder Inference path to set coders, rather than
explicitly setting the output coders for intermediate PCollections.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4cbccee8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4cbccee8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4cbccee8

Branch: refs/heads/master
Commit: 4cbccee8ee9a3b4235c6338fe49efc1f8a079812
Parents: 5a51ace
Author: Thomas Groh <tg...@google.com>
Authored: Mon Dec 12 13:55:49 2016 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Wed Dec 14 11:29:29 2016 -0800

----------------------------------------------------------------------
 .../beam/runners/direct/DirectGroupByKey.java   | 36 +++++++++++---------
 1 file changed, 20 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4cbccee8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
index 405d913..6c10bd2 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
@@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkArgument;
 
 import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.KeyedWorkItemCoder;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
@@ -46,9 +47,6 @@ class DirectGroupByKey<K, V>
 
   @Override
   public PCollection<KV<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
-    @SuppressWarnings("unchecked")
-    KvCoder<K, V> inputCoder = (KvCoder<K, V>) input.getCoder();
-
     // This operation groups by the combination of key and window,
     // merging windows as needed, using the windows assigned to the
     // key/value input elements and the window merge operation of the
@@ -61,19 +59,11 @@ class DirectGroupByKey<K, V>
     // By default, implement GroupByKey via a series of lower-level operations.
     return input
         .apply(new DirectGroupByKeyOnly<K, V>())
-        .setCoder(
-            KeyedWorkItemCoder.of(
-                inputCoder.getKeyCoder(),
-                inputCoder.getValueCoder(),
-                inputWindowingStrategy.getWindowFn().windowCoder()))
 
         // Group each key's values by window, merging windows as needed.
         .apply(
             "GroupAlsoByWindow",
-            new DirectGroupAlsoByWindow<K, V>(inputWindowingStrategy, outputWindowingStrategy))
-
-        .setCoder(
-            KvCoder.of(inputCoder.getKeyCoder(), IterableCoder.of(inputCoder.getValueCoder())));
+            new DirectGroupAlsoByWindow<K, V>(inputWindowingStrategy, outputWindowingStrategy));
   }
 
   static final class DirectGroupByKeyOnly<K, V>
@@ -85,6 +75,16 @@ class DirectGroupByKey<K, V>
     }
 
     DirectGroupByKeyOnly() {}
+
+    @Override
+    protected Coder<?> getDefaultOutputCoder(
+        @SuppressWarnings("unused") PCollection<KV<K, V>> input)
+        throws CannotProvideCoderException {
+      return KeyedWorkItemCoder.of(
+          GroupByKey.getKeyCoder(input.getCoder()),
+          GroupByKey.getInputValueCoder(input.getCoder()),
+          input.getWindowingStrategy().getWindowFn().windowCoder());
+    }
   }
 
   static final class DirectGroupAlsoByWindow<K, V>
@@ -117,15 +117,19 @@ class DirectGroupByKey<K, V>
       return kvCoder;
     }
 
-    public Coder<K> getKeyCoder(Coder<KeyedWorkItem<K, V>> inputCoder) {
-      return getKeyedWorkItemCoder(inputCoder).getKeyCoder();
-    }
-
     public Coder<V> getValueCoder(Coder<KeyedWorkItem<K, V>> inputCoder) {
       return getKeyedWorkItemCoder(inputCoder).getElementCoder();
     }
 
     @Override
+    protected Coder<?> getDefaultOutputCoder(
+        @SuppressWarnings("unused") PCollection<KeyedWorkItem<K, V>> input)
+        throws CannotProvideCoderException {
+      KeyedWorkItemCoder<K, V> inputCoder = getKeyedWorkItemCoder(input.getCoder());
+      return KvCoder.of(inputCoder.getKeyCoder(), IterableCoder.of(inputCoder.getElementCoder()));
+    }
+
+    @Override
     public PCollection<KV<K, Iterable<V>>> expand(PCollection<KeyedWorkItem<K, V>> input) {
       return PCollection.createPrimitiveOutputInternal(
           input.getPipeline(), outputWindowingStrategy, input.isBounded());