You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/04/05 22:56:39 UTC

[5/8] incubator-beam git commit: fixup! Move GroupByKey expansion into DirectPipelineRunner

fixup! Move GroupByKey expansion into DirectPipelineRunner


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bf4dd82d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bf4dd82d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bf4dd82d

Branch: refs/heads/master
Commit: bf4dd82d2d30ede3df13abfc70db453943ce54cc
Parents: bcc010c
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Mar 28 12:35:36 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Mar 28 12:35:36 2016 -0700

----------------------------------------------------------------------
 .../cloud/dataflow/sdk/runners/DirectPipelineRunner.java  | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bf4dd82d/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipelineRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipelineRunner.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipelineRunner.java
index 3940d32..417420a 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipelineRunner.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipelineRunner.java
@@ -1208,7 +1208,7 @@ public class DirectPipelineRunner
 
     Coder<K> keyCoder = GroupByKey.getKeyCoder(input.getCoder());
 
-    Map<DirectPipelineRunner.GroupingKey<K>, List<V>> groupingMap = new HashMap<>();
+    Map<GroupingKey<K>, List<V>> groupingMap = new HashMap<>();
 
     for (ValueWithMetadata<KV<K, V>> elem : inputElems) {
       K key = elem.getValue().getKey();
@@ -1224,7 +1224,7 @@ public class DirectPipelineRunner
             " using " + keyCoder,
             exn);
       }
-      DirectPipelineRunner.GroupingKey<K> groupingKey =
+      GroupingKey<K> groupingKey =
           new GroupingKey<>(key, encodedKey);
       List<V> values = groupingMap.get(groupingKey);
       if (values == null) {
@@ -1236,8 +1236,8 @@ public class DirectPipelineRunner
 
     List<ValueWithMetadata<KV<K, Iterable<V>>>> outputElems =
         new ArrayList<>();
-    for (Map.Entry<DirectPipelineRunner.GroupingKey<K>, List<V>> entry : groupingMap.entrySet()) {
-      DirectPipelineRunner.GroupingKey<K> groupingKey = entry.getKey();
+    for (Map.Entry<GroupingKey<K>, List<V>> entry : groupingMap.entrySet()) {
+      GroupingKey<K> groupingKey = entry.getKey();
       K key = groupingKey.getKey();
       List<V> values = entry.getValue();
       values = context.randomizeIfUnordered(values, true /* inPlaceAllowed */);
@@ -1266,7 +1266,7 @@ public class DirectPipelineRunner
   }
 
   static {
-    DirectPipelineRunner.registerGroupByKeyOnly();
+    registerGroupByKeyOnly();
   }
 
 }