You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/12/20 20:40:23 UTC

[2/5] incubator-beam git commit: Propagate key through ParDo if DoFn is key-preserving

Propagate key through ParDo if DoFn is key-preserving


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d040b7f6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d040b7f6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d040b7f6

Branch: refs/heads/master
Commit: d040b7f6a3cdefde829321015c75a800901cd88f
Parents: b26ceaa
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Dec 8 11:44:48 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Dec 20 11:18:04 2016 -0800

----------------------------------------------------------------------
 .../org/apache/beam/runners/direct/ParDoEvaluator.java | 13 +++++++++++--
 .../beam/runners/direct/ParDoEvaluatorFactory.java     |  3 +++
 .../SplittableProcessElementsEvaluatorFactory.java     |  1 +
 .../apache/beam/runners/direct/ParDoEvaluatorTest.java |  1 +
 4 files changed, 16 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d040b7f6/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
index a915cf0..a5de4c6 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
@@ -47,6 +47,7 @@ class ParDoEvaluator<InputT, OutputT> implements TransformEvaluator<InputT> {
       AppliedPTransform<?, ?, ?> application,
       WindowingStrategy<?, ? extends BoundedWindow> windowingStrategy,
       Serializable fn, // may be OldDoFn or DoFn
+      StructuralKey<?> key,
       List<PCollectionView<?>> sideInputs,
       TupleTag<OutputT> mainOutputTag,
       List<TupleTag<?>> sideOutputTags,
@@ -55,8 +56,16 @@ class ParDoEvaluator<InputT, OutputT> implements TransformEvaluator<InputT> {
 
     Map<TupleTag<?>, UncommittedBundle<?>> outputBundles = new HashMap<>();
     for (Map.Entry<TupleTag<?>, PCollection<?>> outputEntry : outputs.entrySet()) {
-      outputBundles.put(
-          outputEntry.getKey(), evaluationContext.createBundle(outputEntry.getValue()));
+      // Just trust the context's decision as to whether the output should be keyed.
+      // The logic for whether this ParDo is key-preserving and whether the input
+      // is keyed lives elsewhere.
+      if (evaluationContext.isKeyed(outputEntry.getValue())) {
+        outputBundles.put(
+            outputEntry.getKey(), evaluationContext.createKeyedBundle(key, outputEntry.getValue()));
+      } else {
+        outputBundles.put(
+            outputEntry.getKey(), evaluationContext.createBundle(outputEntry.getValue()));
+      }
     }
     BundleOutputManager outputManager = BundleOutputManager.create(outputBundles);
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d040b7f6/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
index b4684e3..835e6ce 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
@@ -112,6 +112,7 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator
     return DoFnLifecycleManagerRemovingTransformEvaluator.wrapping(
         createParDoEvaluator(
             application,
+            inputBundleKey,
             sideInputs,
             mainOutputTag,
             sideOutputTags,
@@ -123,6 +124,7 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator
 
   ParDoEvaluator<InputT, OutputT> createParDoEvaluator(
       AppliedPTransform<PCollection<InputT>, PCollectionTuple, ?> application,
+      StructuralKey<?> key,
       List<PCollectionView<?>> sideInputs,
       TupleTag<OutputT> mainOutputTag,
       List<TupleTag<?>> sideOutputTags,
@@ -137,6 +139,7 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator
           application,
           application.getInput().getWindowingStrategy(),
           fn,
+          key,
           sideInputs,
           mainOutputTag,
           sideOutputTags,

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d040b7f6/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
index aae1149..18f3909 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
@@ -91,6 +91,7 @@ class SplittableProcessElementsEvaluatorFactory<InputT, OutputT, RestrictionT>
         parDoEvaluator =
             delegateFactory.createParDoEvaluator(
                 application,
+                inputBundle.getKey(),
                 transform.getSideInputs(),
                 transform.getMainOutputTag(),
                 transform.getSideOutputTags().getAll(),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d040b7f6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
index 1a3207b..b3aceeb 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
@@ -164,6 +164,7 @@ public class ParDoEvaluatorTest {
         transform,
         transform.getInput().getWindowingStrategy(),
         fn,
+        null /* key */,
         ImmutableList.<PCollectionView<?>>of(singletonView),
         mainOutputTag,
         sideOutputTags,