You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/06/28 04:34:27 UTC
[3/6] beam git commit: Fix getAdditionalInputs for SplittableParDo
transforms
Fix getAdditionalInputs for SplittableParDo transforms
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a66bcd68
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a66bcd68
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a66bcd68
Branch: refs/heads/master
Commit: a66bcd68a1e56d5d38fccfce2ffeec28ba1c82de
Parents: 58fba59
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Jun 13 10:00:09 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Jun 27 21:08:10 2017 -0700
----------------------------------------------------------------------
.../apache/beam/runners/apex/ApexRunner.java | 2 +-
.../core/construction/SplittableParDo.java | 66 +++++++++++++++-----
.../core/construction/SplittableParDoTest.java | 8 +--
.../direct/ParDoMultiOverrideFactory.java | 2 +-
.../flink/FlinkStreamingPipelineTranslator.java | 2 +-
.../dataflow/SplittableParDoOverrides.java | 2 +-
6 files changed, 57 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/a66bcd68/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
index 95b354a..fd0a1c9 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
@@ -381,7 +381,7 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
AppliedPTransform<PCollection<InputT>, PCollectionTuple, MultiOutput<InputT, OutputT>>
transform) {
return PTransformReplacement.of(PTransformReplacements.getSingletonMainInput(transform),
- new SplittableParDo<>(transform.getTransform()));
+ SplittableParDo.forJavaParDo(transform.getTransform()));
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/a66bcd68/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
index 5ccafcb..f31b495 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
@@ -18,9 +18,9 @@
package org.apache.beam.runners.core.construction;
import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
import org.apache.beam.runners.core.construction.PTransformTranslation.RawPTransform;
import org.apache.beam.sdk.annotations.Experimental;
@@ -40,6 +40,8 @@ import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PCollectionViews;
+import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.WindowingStrategy;
@@ -64,7 +66,11 @@ import org.apache.beam.sdk.values.WindowingStrategy;
@Experimental(Experimental.Kind.SPLITTABLE_DO_FN)
public class SplittableParDo<InputT, OutputT, RestrictionT>
extends PTransform<PCollection<InputT>, PCollectionTuple> {
- private final ParDo.MultiOutput<InputT, OutputT> parDo;
+
+ private final DoFn<InputT, OutputT> doFn;
+ private final List<PCollectionView<?>> sideInputs;
+ private final TupleTag<OutputT> mainOutputTag;
+ private final TupleTagList additionalOutputTags;
public static final String SPLITTABLE_PROCESS_URN =
"urn:beam:runners_core:transforms:splittable_process:v1";
@@ -75,24 +81,39 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
public static final String SPLITTABLE_GBKIKWI_URN =
"urn:beam:runners_core:transforms:splittable_gbkikwi:v1";
+ private SplittableParDo(
+ DoFn<InputT, OutputT> doFn,
+ TupleTag<OutputT> mainOutputTag,
+ List<PCollectionView<?>> sideInputs,
+ TupleTagList additionalOutputTags) {
+ checkArgument(
+ DoFnSignatures.getSignature(doFn.getClass()).processElement().isSplittable(),
+ "fn must be a splittable DoFn");
+ this.doFn = doFn;
+ this.mainOutputTag = mainOutputTag;
+ this.sideInputs = sideInputs;
+ this.additionalOutputTags = additionalOutputTags;
+ }
+
/**
- * Creates the transform for the given original multi-output {@link ParDo}.
+ * Creates a {@link SplittableParDo} from an original Java {@link ParDo}.
*
* @param parDo The splittable {@link ParDo} transform.
*/
- public SplittableParDo(ParDo.MultiOutput<InputT, OutputT> parDo) {
- checkNotNull(parDo, "parDo must not be null");
- this.parDo = parDo;
- checkArgument(
- DoFnSignatures.getSignature(parDo.getFn().getClass()).processElement().isSplittable(),
- "fn must be a splittable DoFn");
+ public static <InputT, OutputT> SplittableParDo<InputT, OutputT, ?> forJavaParDo(
+ ParDo.MultiOutput<InputT, OutputT> parDo) {
+ checkArgument(parDo != null, "parDo must not be null");
+ return new SplittableParDo(
+ parDo.getFn(),
+ parDo.getMainOutputTag(),
+ parDo.getSideInputs(),
+ parDo.getAdditionalOutputTags());
}
@Override
public PCollectionTuple expand(PCollection<InputT> input) {
- DoFn<InputT, OutputT> fn = parDo.getFn();
Coder<RestrictionT> restrictionCoder =
- DoFnInvokers.invokerFor(fn)
+ DoFnInvokers.invokerFor(doFn)
.invokeGetRestrictionCoder(input.getPipeline().getCoderRegistry());
Coder<KV<InputT, RestrictionT>> splitCoder = KvCoder.of(input.getCoder(), restrictionCoder);
@@ -100,9 +121,10 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
input
.apply(
"Pair with initial restriction",
- ParDo.of(new PairWithRestrictionFn<InputT, OutputT, RestrictionT>(fn)))
+ ParDo.of(new PairWithRestrictionFn<InputT, OutputT, RestrictionT>(doFn)))
.setCoder(splitCoder)
- .apply("Split restriction", ParDo.of(new SplitRestrictionFn<InputT, RestrictionT>(fn)))
+ .apply(
+ "Split restriction", ParDo.of(new SplitRestrictionFn<InputT, RestrictionT>(doFn)))
.setCoder(splitCoder)
// ProcessFn requires all input elements to be in a single window and have a single
// element per work item. This must precede the unique keying so each key has a single
@@ -115,13 +137,18 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
return keyedRestrictions.apply(
"ProcessKeyedElements",
new ProcessKeyedElements<>(
- fn,
+ doFn,
input.getCoder(),
restrictionCoder,
(WindowingStrategy<InputT, ?>) input.getWindowingStrategy(),
- parDo.getSideInputs(),
- parDo.getMainOutputTag(),
- parDo.getAdditionalOutputTags()));
+ sideInputs,
+ mainOutputTag,
+ additionalOutputTags));
+ }
+
+ @Override
+ public Map<TupleTag<?>, PValue> getAdditionalInputs() {
+ return PCollectionViews.toAdditionalInputs(sideInputs);
}
/**
@@ -231,6 +258,11 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
}
@Override
+ public Map<TupleTag<?>, PValue> getAdditionalInputs() {
+ return PCollectionViews.toAdditionalInputs(sideInputs);
+ }
+
+ @Override
public String getUrn() {
return SPLITTABLE_PROCESS_KEYED_ELEMENTS_URN;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/a66bcd68/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java
index 6e4d6c4..f4c596e 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java
@@ -122,14 +122,14 @@ public class SplittableParDoTest {
"Applying a bounded SDF to a bounded collection produces a bounded collection",
PCollection.IsBounded.BOUNDED,
makeBoundedCollection(pipeline)
- .apply("bounded to bounded", new SplittableParDo<>(makeParDo(boundedFn)))
+ .apply("bounded to bounded", SplittableParDo.forJavaParDo(makeParDo(boundedFn)))
.get(MAIN_OUTPUT_TAG)
.isBounded());
assertEquals(
"Applying a bounded SDF to an unbounded collection produces an unbounded collection",
PCollection.IsBounded.UNBOUNDED,
makeUnboundedCollection(pipeline)
- .apply("bounded to unbounded", new SplittableParDo<>(makeParDo(boundedFn)))
+ .apply("bounded to unbounded", SplittableParDo.forJavaParDo(makeParDo(boundedFn)))
.get(MAIN_OUTPUT_TAG)
.isBounded());
}
@@ -143,14 +143,14 @@ public class SplittableParDoTest {
"Applying an unbounded SDF to a bounded collection produces a bounded collection",
PCollection.IsBounded.UNBOUNDED,
makeBoundedCollection(pipeline)
- .apply("unbounded to bounded", new SplittableParDo<>(makeParDo(unboundedFn)))
+ .apply("unbounded to bounded", SplittableParDo.forJavaParDo(makeParDo(unboundedFn)))
.get(MAIN_OUTPUT_TAG)
.isBounded());
assertEquals(
"Applying an unbounded SDF to an unbounded collection produces an unbounded collection",
PCollection.IsBounded.UNBOUNDED,
makeUnboundedCollection(pipeline)
- .apply("unbounded to unbounded", new SplittableParDo<>(makeParDo(unboundedFn)))
+ .apply("unbounded to unbounded", SplittableParDo.forJavaParDo(makeParDo(unboundedFn)))
.get(MAIN_OUTPUT_TAG)
.isBounded());
}
http://git-wip-us.apache.org/repos/asf/beam/blob/a66bcd68/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
index b20113e..9a26283 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
@@ -81,7 +81,7 @@ class ParDoMultiOverrideFactory<InputT, OutputT>
DoFn<InputT, OutputT> fn = transform.getFn();
DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass());
if (signature.processElement().isSplittable()) {
- return new SplittableParDo(transform);
+ return (PTransform) SplittableParDo.forJavaParDo(transform);
} else if (signature.stateDeclarations().size() > 0
|| signature.timerDeclarations().size() > 0) {
http://git-wip-us.apache.org/repos/asf/beam/blob/a66bcd68/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
index 27bb4ec..ebc9345 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
@@ -188,7 +188,7 @@ class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator {
transform) {
return PTransformReplacement.of(
PTransformReplacements.getSingletonMainInput(transform),
- new SplittableParDo<>(transform.getTransform()));
+ SplittableParDo.forJavaParDo(transform.getTransform()));
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/a66bcd68/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/SplittableParDoOverrides.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/SplittableParDoOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/SplittableParDoOverrides.java
index 9322878..fc010f8 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/SplittableParDoOverrides.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/SplittableParDoOverrides.java
@@ -64,7 +64,7 @@ class SplittableParDoOverrides {
appliedTransform) {
return PTransformReplacement.of(
PTransformReplacements.getSingletonMainInput(appliedTransform),
- new SplittableParDo<>(appliedTransform.getTransform()));
+ SplittableParDo.forJavaParDo(appliedTransform.getTransform()));
}
@Override