You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/06/10 03:14:03 UTC

[4/5] beam git commit: Port DirectGroupByKey to SDK-agnostic APIs

Port DirectGroupByKey to SDK-agnostic APIs


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/02dbaefd
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/02dbaefd
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/02dbaefd

Branch: refs/heads/master
Commit: 02dbaefd2bbad0f0ff0b87469d184137b220fae7
Parents: 8c5b57e
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri May 26 14:27:23 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Jun 9 19:56:52 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/runners/direct/DirectGroupByKey.java  | 13 +++++++------
 .../direct/DirectGroupByKeyOverrideFactory.java       | 14 +++++++++++---
 2 files changed, 18 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/02dbaefd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
index 2fc0dd4..06b8e29 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
@@ -36,13 +36,17 @@ import org.apache.beam.sdk.values.WindowingStrategy;
 
 class DirectGroupByKey<K, V>
     extends ForwardingPTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> {
-  private final GroupByKey<K, V> original;
+  private final PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> original;
 
   static final String DIRECT_GBKO_URN = "urn:beam:directrunner:transforms:gbko:v1";
   static final String DIRECT_GABW_URN = "urn:beam:directrunner:transforms:gabw:v1";
+  private final WindowingStrategy<?, ?> outputWindowingStrategy;
 
-  DirectGroupByKey(GroupByKey<K, V> from) {
-    this.original = from;
+  DirectGroupByKey(
+      PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> original,
+      WindowingStrategy<?, ?> outputWindowingStrategy) {
+    this.original = original;
+    this.outputWindowingStrategy = outputWindowingStrategy;
   }
 
   @Override
@@ -57,9 +61,6 @@ class DirectGroupByKey<K, V>
     // key/value input elements and the window merge operation of the
     // window function associated with the input PCollection.
     WindowingStrategy<?, ?> inputWindowingStrategy = input.getWindowingStrategy();
-    // Update the windowing strategy as appropriate.
-    WindowingStrategy<?, ?> outputWindowingStrategy =
-        original.updateWindowingStrategy(inputWindowingStrategy);
 
     // By default, implement GroupByKey via a series of lower-level operations.
     return input

http://git-wip-us.apache.org/repos/asf/beam/blob/02dbaefd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java
index c2eb5e7..9c2de3d 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java
@@ -17,26 +17,34 @@
  */
 package org.apache.beam.runners.direct;
 
+import com.google.common.collect.Iterables;
 import org.apache.beam.runners.core.construction.PTransformReplacements;
 import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory;
 import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 
 /** A {@link PTransformOverrideFactory} for {@link GroupByKey} PTransforms. */
 final class DirectGroupByKeyOverrideFactory<K, V>
     extends SingleInputOutputOverrideFactory<
-        PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>, GroupByKey<K, V>> {
+        PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>,
+        PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>>> {
   @Override
   public PTransformReplacement<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>>
       getReplacementTransform(
           AppliedPTransform<
-                  PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>, GroupByKey<K, V>>
+                  PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>,
+                  PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>>>
               transform) {
+
+    PCollection<KV<K, Iterable<V>>> output =
+        (PCollection<KV<K, Iterable<V>>>) Iterables.getOnlyElement(transform.getOutputs().values());
+
     return PTransformReplacement.of(
         PTransformReplacements.getSingletonMainInput(transform),
-        new DirectGroupByKey<>(transform.getTransform()));
+        new DirectGroupByKey<>(transform.getTransform(), output.getWindowingStrategy()));
   }
 }