You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/12/20 20:40:23 UTC
[2/5] incubator-beam git commit: Propagate key through ParDo if DoFn
is key-preserving
Propagate key through ParDo if DoFn is key-preserving
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d040b7f6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d040b7f6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d040b7f6
Branch: refs/heads/master
Commit: d040b7f6a3cdefde829321015c75a800901cd88f
Parents: b26ceaa
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Dec 8 11:44:48 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Dec 20 11:18:04 2016 -0800
----------------------------------------------------------------------
.../org/apache/beam/runners/direct/ParDoEvaluator.java | 13 +++++++++++--
.../beam/runners/direct/ParDoEvaluatorFactory.java | 3 +++
.../SplittableProcessElementsEvaluatorFactory.java | 1 +
.../apache/beam/runners/direct/ParDoEvaluatorTest.java | 1 +
4 files changed, 16 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d040b7f6/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
index a915cf0..a5de4c6 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
@@ -47,6 +47,7 @@ class ParDoEvaluator<InputT, OutputT> implements TransformEvaluator<InputT> {
AppliedPTransform<?, ?, ?> application,
WindowingStrategy<?, ? extends BoundedWindow> windowingStrategy,
Serializable fn, // may be OldDoFn or DoFn
+ StructuralKey<?> key,
List<PCollectionView<?>> sideInputs,
TupleTag<OutputT> mainOutputTag,
List<TupleTag<?>> sideOutputTags,
@@ -55,8 +56,16 @@ class ParDoEvaluator<InputT, OutputT> implements TransformEvaluator<InputT> {
Map<TupleTag<?>, UncommittedBundle<?>> outputBundles = new HashMap<>();
for (Map.Entry<TupleTag<?>, PCollection<?>> outputEntry : outputs.entrySet()) {
- outputBundles.put(
- outputEntry.getKey(), evaluationContext.createBundle(outputEntry.getValue()));
+ // Just trust the context's decision as to whether the output should be keyed.
+ // The logic for whether this ParDo is key-preserving and whether the input
+ // is keyed lives elsewhere.
+ if (evaluationContext.isKeyed(outputEntry.getValue())) {
+ outputBundles.put(
+ outputEntry.getKey(), evaluationContext.createKeyedBundle(key, outputEntry.getValue()));
+ } else {
+ outputBundles.put(
+ outputEntry.getKey(), evaluationContext.createBundle(outputEntry.getValue()));
+ }
}
BundleOutputManager outputManager = BundleOutputManager.create(outputBundles);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d040b7f6/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
index b4684e3..835e6ce 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
@@ -112,6 +112,7 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator
return DoFnLifecycleManagerRemovingTransformEvaluator.wrapping(
createParDoEvaluator(
application,
+ inputBundleKey,
sideInputs,
mainOutputTag,
sideOutputTags,
@@ -123,6 +124,7 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator
ParDoEvaluator<InputT, OutputT> createParDoEvaluator(
AppliedPTransform<PCollection<InputT>, PCollectionTuple, ?> application,
+ StructuralKey<?> key,
List<PCollectionView<?>> sideInputs,
TupleTag<OutputT> mainOutputTag,
List<TupleTag<?>> sideOutputTags,
@@ -137,6 +139,7 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator
application,
application.getInput().getWindowingStrategy(),
fn,
+ key,
sideInputs,
mainOutputTag,
sideOutputTags,
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d040b7f6/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
index aae1149..18f3909 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
@@ -91,6 +91,7 @@ class SplittableProcessElementsEvaluatorFactory<InputT, OutputT, RestrictionT>
parDoEvaluator =
delegateFactory.createParDoEvaluator(
application,
+ inputBundle.getKey(),
transform.getSideInputs(),
transform.getMainOutputTag(),
transform.getSideOutputTags().getAll(),
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d040b7f6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
index 1a3207b..b3aceeb 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
@@ -164,6 +164,7 @@ public class ParDoEvaluatorTest {
transform,
transform.getInput().getWindowingStrategy(),
fn,
+ null /* key */,
ImmutableList.<PCollectionView<?>>of(singletonView),
mainOutputTag,
sideOutputTags,