You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/11/23 06:52:44 UTC
[48/50] incubator-beam git commit: Output Keyed Bundles in
GroupAlsoByWindowEvaluator
Output Keyed Bundles in GroupAlsoByWindowEvaluator
This allows reuse of keys for downstream serialization.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1cec9702
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1cec9702
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1cec9702
Branch: refs/heads/gearpump-runner
Commit: 1cec9702e62b64252149645627d96889edfeb33e
Parents: b41789e
Author: Thomas Groh <tg...@google.com>
Authored: Tue Nov 22 14:51:39 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Nov 22 15:05:17 2016 -0800
----------------------------------------------------------------------
.../main/java/org/apache/beam/runners/direct/DirectRunner.java | 5 ++++-
.../beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java | 4 +++-
2 files changed, 7 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1cec9702/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index 0060e84..cb31947 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -31,6 +31,7 @@ import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.GBKIntoKeyedWorkItems;
+import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupAlsoByWindow;
import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly;
import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult;
import org.apache.beam.runners.direct.TestStreamEvaluatorFactory.DirectTestStreamFactory;
@@ -310,7 +311,9 @@ public class DirectRunner
KeyedPValueTrackingVisitor keyedPValueVisitor =
KeyedPValueTrackingVisitor.create(
ImmutableSet.<Class<? extends PTransform>>of(
- GroupByKey.class, DirectGroupByKeyOnly.class));
+ GBKIntoKeyedWorkItems.class,
+ DirectGroupByKeyOnly.class,
+ DirectGroupAlsoByWindow.class));
pipeline.traverseTopologically(keyedPValueVisitor);
DisplayDataValidator.validatePipeline(pipeline);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1cec9702/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
index b946e4d..36c742b 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
@@ -112,6 +112,7 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
private @SuppressWarnings("unchecked") final WindowingStrategy<?, BoundedWindow>
windowingStrategy;
+ private final StructuralKey<?> structuralKey;
private final Collection<UncommittedBundle<?>> outputBundles;
private final ImmutableList.Builder<WindowedValue<KeyedWorkItem<K, V>>> unprocessedElements;
private final AggregatorContainer.Mutator aggregatorChanges;
@@ -130,6 +131,7 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
this.evaluationContext = evaluationContext;
this.application = application;
+ structuralKey = inputBundle.getKey();
stepContext = evaluationContext
.getExecutionContext(application, inputBundle.getKey())
.getOrCreateStepContext(
@@ -159,7 +161,7 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
K key = workItem.key();
UncommittedBundle<KV<K, Iterable<V>>> bundle =
- evaluationContext.createBundle(application.getOutput());
+ evaluationContext.createKeyedBundle(structuralKey, application.getOutput());
outputBundles.add(bundle);
CopyOnAccessInMemoryStateInternals<K> stateInternals =
(CopyOnAccessInMemoryStateInternals<K>) stepContext.stateInternals();