You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/04/05 22:56:39 UTC
[5/8] incubator-beam git commit: fixup! Move GroupByKey expansion
into DirectPipelineRunner
fixup! Move GroupByKey expansion into DirectPipelineRunner
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bf4dd82d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bf4dd82d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bf4dd82d
Branch: refs/heads/master
Commit: bf4dd82d2d30ede3df13abfc70db453943ce54cc
Parents: bcc010c
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Mar 28 12:35:36 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Mar 28 12:35:36 2016 -0700
----------------------------------------------------------------------
.../cloud/dataflow/sdk/runners/DirectPipelineRunner.java | 10 +++++-----
1 file changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bf4dd82d/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipelineRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipelineRunner.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipelineRunner.java
index 3940d32..417420a 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipelineRunner.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipelineRunner.java
@@ -1208,7 +1208,7 @@ public class DirectPipelineRunner
Coder<K> keyCoder = GroupByKey.getKeyCoder(input.getCoder());
- Map<DirectPipelineRunner.GroupingKey<K>, List<V>> groupingMap = new HashMap<>();
+ Map<GroupingKey<K>, List<V>> groupingMap = new HashMap<>();
for (ValueWithMetadata<KV<K, V>> elem : inputElems) {
K key = elem.getValue().getKey();
@@ -1224,7 +1224,7 @@ public class DirectPipelineRunner
" using " + keyCoder,
exn);
}
- DirectPipelineRunner.GroupingKey<K> groupingKey =
+ GroupingKey<K> groupingKey =
new GroupingKey<>(key, encodedKey);
List<V> values = groupingMap.get(groupingKey);
if (values == null) {
@@ -1236,8 +1236,8 @@ public class DirectPipelineRunner
List<ValueWithMetadata<KV<K, Iterable<V>>>> outputElems =
new ArrayList<>();
- for (Map.Entry<DirectPipelineRunner.GroupingKey<K>, List<V>> entry : groupingMap.entrySet()) {
- DirectPipelineRunner.GroupingKey<K> groupingKey = entry.getKey();
+ for (Map.Entry<GroupingKey<K>, List<V>> entry : groupingMap.entrySet()) {
+ GroupingKey<K> groupingKey = entry.getKey();
K key = groupingKey.getKey();
List<V> values = entry.getValue();
values = context.randomizeIfUnordered(values, true /* inPlaceAllowed */);
@@ -1266,7 +1266,7 @@ public class DirectPipelineRunner
}
static {
- DirectPipelineRunner.registerGroupByKeyOnly();
+ registerGroupByKeyOnly();
}
}