You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/06/10 03:14:03 UTC
[4/5] beam git commit: Port DirectGroupByKey to SDK-agnostic APIs
Port DirectGroupByKey to SDK-agnostic APIs
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/02dbaefd
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/02dbaefd
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/02dbaefd
Branch: refs/heads/master
Commit: 02dbaefd2bbad0f0ff0b87469d184137b220fae7
Parents: 8c5b57e
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri May 26 14:27:23 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Jun 9 19:56:52 2017 -0700
----------------------------------------------------------------------
.../apache/beam/runners/direct/DirectGroupByKey.java | 13 +++++++------
.../direct/DirectGroupByKeyOverrideFactory.java | 14 +++++++++++---
2 files changed, 18 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/02dbaefd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
index 2fc0dd4..06b8e29 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
@@ -36,13 +36,17 @@ import org.apache.beam.sdk.values.WindowingStrategy;
class DirectGroupByKey<K, V>
extends ForwardingPTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> {
- private final GroupByKey<K, V> original;
+ private final PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> original;
static final String DIRECT_GBKO_URN = "urn:beam:directrunner:transforms:gbko:v1";
static final String DIRECT_GABW_URN = "urn:beam:directrunner:transforms:gabw:v1";
+ private final WindowingStrategy<?, ?> outputWindowingStrategy;
- DirectGroupByKey(GroupByKey<K, V> from) {
- this.original = from;
+ DirectGroupByKey(
+ PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> original,
+ WindowingStrategy<?, ?> outputWindowingStrategy) {
+ this.original = original;
+ this.outputWindowingStrategy = outputWindowingStrategy;
}
@Override
@@ -57,9 +61,6 @@ class DirectGroupByKey<K, V>
// key/value input elements and the window merge operation of the
// window function associated with the input PCollection.
WindowingStrategy<?, ?> inputWindowingStrategy = input.getWindowingStrategy();
- // Update the windowing strategy as appropriate.
- WindowingStrategy<?, ?> outputWindowingStrategy =
- original.updateWindowingStrategy(inputWindowingStrategy);
// By default, implement GroupByKey via a series of lower-level operations.
return input
http://git-wip-us.apache.org/repos/asf/beam/blob/02dbaefd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java
index c2eb5e7..9c2de3d 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java
@@ -17,26 +17,34 @@
*/
package org.apache.beam.runners.direct;
+import com.google.common.collect.Iterables;
import org.apache.beam.runners.core.construction.PTransformReplacements;
import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
/** A {@link PTransformOverrideFactory} for {@link GroupByKey} PTransforms. */
final class DirectGroupByKeyOverrideFactory<K, V>
extends SingleInputOutputOverrideFactory<
- PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>, GroupByKey<K, V>> {
+ PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>,
+ PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>>> {
@Override
public PTransformReplacement<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>>
getReplacementTransform(
AppliedPTransform<
- PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>, GroupByKey<K, V>>
+ PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>,
+ PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>>>
transform) {
+
+ PCollection<KV<K, Iterable<V>>> output =
+ (PCollection<KV<K, Iterable<V>>>) Iterables.getOnlyElement(transform.getOutputs().values());
+
return PTransformReplacement.of(
PTransformReplacements.getSingletonMainInput(transform),
- new DirectGroupByKey<>(transform.getTransform()));
+ new DirectGroupByKey<>(transform.getTransform(), output.getWindowingStrategy()));
}
}