You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/11/22 23:12:04 UTC

[1/2] incubator-beam git commit: This closes #1422

Repository: incubator-beam
Updated Branches:
  refs/heads/master b41789e9c -> dd85cad02


This closes #1422


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/dd85cad0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/dd85cad0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/dd85cad0

Branch: refs/heads/master
Commit: dd85cad0284d266fc569ecc6d98653bf8aa5d031
Parents: b41789e 1cec970
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Nov 22 15:05:17 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Nov 22 15:05:17 2016 -0800

----------------------------------------------------------------------
 .../main/java/org/apache/beam/runners/direct/DirectRunner.java  | 5 ++++-
 .../beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java  | 4 +++-
 2 files changed, 7 insertions(+), 2 deletions(-)
----------------------------------------------------------------------



[2/2] incubator-beam git commit: Output Keyed Bundles in GroupAlsoByWindowEvaluator

Posted by ke...@apache.org.
Output Keyed Bundles in GroupAlsoByWindowEvaluator

This allows reuse of keys for downstream serialization.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1cec9702
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1cec9702
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1cec9702

Branch: refs/heads/master
Commit: 1cec9702e62b64252149645627d96889edfeb33e
Parents: b41789e
Author: Thomas Groh <tg...@google.com>
Authored: Tue Nov 22 14:51:39 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Nov 22 15:05:17 2016 -0800

----------------------------------------------------------------------
 .../main/java/org/apache/beam/runners/direct/DirectRunner.java  | 5 ++++-
 .../beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java  | 4 +++-
 2 files changed, 7 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1cec9702/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index 0060e84..cb31947 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -31,6 +31,7 @@ import java.util.Map;
 import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.core.GBKIntoKeyedWorkItems;
+import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupAlsoByWindow;
 import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly;
 import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult;
 import org.apache.beam.runners.direct.TestStreamEvaluatorFactory.DirectTestStreamFactory;
@@ -310,7 +311,9 @@ public class DirectRunner
     KeyedPValueTrackingVisitor keyedPValueVisitor =
         KeyedPValueTrackingVisitor.create(
             ImmutableSet.<Class<? extends PTransform>>of(
-                GroupByKey.class, DirectGroupByKeyOnly.class));
+                GBKIntoKeyedWorkItems.class,
+                DirectGroupByKeyOnly.class,
+                DirectGroupAlsoByWindow.class));
     pipeline.traverseTopologically(keyedPValueVisitor);
 
     DisplayDataValidator.validatePipeline(pipeline);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1cec9702/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
index b946e4d..36c742b 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
@@ -112,6 +112,7 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
     private @SuppressWarnings("unchecked") final WindowingStrategy<?, BoundedWindow>
         windowingStrategy;
 
+    private final StructuralKey<?> structuralKey;
     private final Collection<UncommittedBundle<?>> outputBundles;
     private final ImmutableList.Builder<WindowedValue<KeyedWorkItem<K, V>>> unprocessedElements;
     private final AggregatorContainer.Mutator aggregatorChanges;
@@ -130,6 +131,7 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
       this.evaluationContext = evaluationContext;
       this.application = application;
 
+      structuralKey = inputBundle.getKey();
       stepContext = evaluationContext
           .getExecutionContext(application, inputBundle.getKey())
           .getOrCreateStepContext(
@@ -159,7 +161,7 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
       K key = workItem.key();
 
       UncommittedBundle<KV<K, Iterable<V>>> bundle =
-          evaluationContext.createBundle(application.getOutput());
+          evaluationContext.createKeyedBundle(structuralKey, application.getOutput());
       outputBundles.add(bundle);
       CopyOnAccessInMemoryStateInternals<K> stateInternals =
           (CopyOnAccessInMemoryStateInternals<K>) stepContext.stateInternals();