You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/08/04 01:02:30 UTC
[5/6] beam git commit: Remembers the output coders of SplittableParDo
Remembers the output coders of SplittableParDo
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/48690bc6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/48690bc6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/48690bc6
Branch: refs/heads/master
Commit: 48690bc61673e767d4a1fa72e0499c32f160db39
Parents: 95e2a00
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Aug 3 17:35:03 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Thu Aug 3 17:35:03 2017 -0700
----------------------------------------------------------------------
.../apache/beam/runners/apex/ApexRunner.java | 5 +-
.../core/construction/SplittableParDo.java | 65 ++++++++++----------
.../core/construction/SplittableParDoTest.java | 33 +++++-----
.../core/SplittableParDoViaKeyedWorkItems.java | 1 +
.../direct/ParDoMultiOverrideFactory.java | 2 +-
.../dataflow/SplittableParDoOverrides.java | 2 +-
6 files changed, 56 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/48690bc6/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
index cee524e..57d2593 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
@@ -379,8 +379,9 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
public PTransformReplacement<PCollection<InputT>, PCollectionTuple> getReplacementTransform(
AppliedPTransform<PCollection<InputT>, PCollectionTuple, MultiOutput<InputT, OutputT>>
transform) {
- return PTransformReplacement.of(PTransformReplacements.getSingletonMainInput(transform),
- SplittableParDo.forJavaParDo(transform.getTransform()));
+ return PTransformReplacement.of(
+ PTransformReplacements.getSingletonMainInput(transform),
+ SplittableParDo.forAppliedParDo(transform));
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/48690bc6/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
index bcc5de8..32d3409 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
@@ -19,8 +19,8 @@ package org.apache.beam.runners.core.construction;
import static com.google.common.base.Preconditions.checkArgument;
+import com.google.common.collect.Maps;
import java.io.IOException;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -74,6 +74,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
private final List<PCollectionView<?>> sideInputs;
private final TupleTag<OutputT> mainOutputTag;
private final TupleTagList additionalOutputTags;
+ private final Map<TupleTag<?>, Coder<?>> outputTagsToCoders;
public static final String SPLITTABLE_PROCESS_URN =
"urn:beam:runners_core:transforms:splittable_process:v1";
@@ -86,34 +87,18 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
private SplittableParDo(
DoFn<InputT, OutputT> doFn,
- TupleTag<OutputT> mainOutputTag,
List<PCollectionView<?>> sideInputs,
- TupleTagList additionalOutputTags) {
+ TupleTag<OutputT> mainOutputTag,
+ TupleTagList additionalOutputTags,
+ Map<TupleTag<?>, Coder<?>> outputTagsToCoders) {
checkArgument(
DoFnSignatures.getSignature(doFn.getClass()).processElement().isSplittable(),
"fn must be a splittable DoFn");
this.doFn = doFn;
- this.mainOutputTag = mainOutputTag;
this.sideInputs = sideInputs;
+ this.mainOutputTag = mainOutputTag;
this.additionalOutputTags = additionalOutputTags;
- }
-
- /**
- * Creates a {@link SplittableParDo} from an original Java {@link ParDo}.
- *
- * @param parDo The splittable {@link ParDo} transform.
- */
- public static <InputT, OutputT> SplittableParDo<InputT, OutputT, ?> forJavaParDo(
- ParDo.MultiOutput<InputT, OutputT> parDo) {
- checkArgument(parDo != null, "parDo must not be null");
- checkArgument(
- DoFnSignatures.getSignature(parDo.getFn().getClass()).processElement().isSplittable(),
- "fn must be a splittable DoFn");
- return new SplittableParDo(
- parDo.getFn(),
- parDo.getMainOutputTag(),
- parDo.getSideInputs(),
- parDo.getAdditionalOutputTags());
+ this.outputTagsToCoders = outputTagsToCoders;
}
/**
@@ -122,15 +107,22 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
* <p>The input may generally be a deserialized transform so it may not actually be a {@link
* ParDo}. Instead {@link ParDoTranslation} will be used to extract fields.
*/
- public static SplittableParDo<?, ?, ?> forAppliedParDo(AppliedPTransform<?, ?, ?> parDo) {
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ public static <InputT, OutputT> SplittableParDo<InputT, OutputT, ?> forAppliedParDo(
+ AppliedPTransform<PCollection<InputT>, PCollectionTuple, ?> parDo) {
checkArgument(parDo != null, "parDo must not be null");
try {
- return new SplittableParDo<>(
+ Map<TupleTag<?>, Coder<?>> outputTagsToCoders = Maps.newHashMap();
+ for (Map.Entry<TupleTag<?>, PValue> entry : parDo.getOutputs().entrySet()) {
+ outputTagsToCoders.put(entry.getKey(), ((PCollection) entry.getValue()).getCoder());
+ }
+ return new SplittableParDo(
ParDoTranslation.getDoFn(parDo),
- (TupleTag) ParDoTranslation.getMainOutputTag(parDo),
ParDoTranslation.getSideInputs(parDo),
- ParDoTranslation.getAdditionalOutputTags(parDo));
+ ParDoTranslation.getMainOutputTag(parDo),
+ ParDoTranslation.getAdditionalOutputTags(parDo),
+ outputTagsToCoders);
} catch (IOException exc) {
throw new RuntimeException(exc);
}
@@ -169,7 +161,8 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
(WindowingStrategy<InputT, ?>) input.getWindowingStrategy(),
sideInputs,
mainOutputTag,
- additionalOutputTags));
+ additionalOutputTags,
+ outputTagsToCoders));
}
@Override
@@ -203,6 +196,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
private final List<PCollectionView<?>> sideInputs;
private final TupleTag<OutputT> mainOutputTag;
private final TupleTagList additionalOutputTags;
+ private final Map<TupleTag<?>, Coder<?>> outputTagsToCoders;
/**
* @param fn the splittable {@link DoFn}.
@@ -210,7 +204,8 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
* @param sideInputs list of side inputs that should be available to the {@link DoFn}.
* @param mainOutputTag {@link TupleTag Tag} of the {@link DoFn DoFn's} main output.
* @param additionalOutputTags {@link TupleTagList Tags} of the {@link DoFn DoFn's} additional
- * outputs.
+ * @param outputTagsToCoders A map from output tag to the coder for that output, which should
+ * provide mappings for the main and all additional tags.
*/
public ProcessKeyedElements(
DoFn<InputT, OutputT> fn,
@@ -219,7 +214,8 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
WindowingStrategy<InputT, ?> windowingStrategy,
List<PCollectionView<?>> sideInputs,
TupleTag<OutputT> mainOutputTag,
- TupleTagList additionalOutputTags) {
+ TupleTagList additionalOutputTags,
+ Map<TupleTag<?>, Coder<?>> outputTagsToCoders) {
this.fn = fn;
this.elementCoder = elementCoder;
this.restrictionCoder = restrictionCoder;
@@ -227,6 +223,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
this.sideInputs = sideInputs;
this.mainOutputTag = mainOutputTag;
this.additionalOutputTags = additionalOutputTags;
+ this.outputTagsToCoders = outputTagsToCoders;
}
public DoFn<InputT, OutputT> getFn() {
@@ -257,10 +254,14 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
return additionalOutputTags;
}
+ public Map<TupleTag<?>, Coder<?>> getOutputTagsToCoders() {
+ return outputTagsToCoders;
+ }
+
@Override
public PCollectionTuple expand(PCollection<KV<String, KV<InputT, RestrictionT>>> input) {
return createPrimitiveOutputFor(
- input, fn, mainOutputTag, additionalOutputTags, windowingStrategy);
+ input, fn, mainOutputTag, additionalOutputTags, outputTagsToCoders, windowingStrategy);
}
public static <OutputT> PCollectionTuple createPrimitiveOutputFor(
@@ -268,14 +269,14 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
DoFn<?, OutputT> fn,
TupleTag<OutputT> mainOutputTag,
TupleTagList additionalOutputTags,
+ Map<TupleTag<?>, Coder<?>> outputTagsToCoders,
WindowingStrategy<?, ?> windowingStrategy) {
DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass());
PCollectionTuple outputs =
PCollectionTuple.ofPrimitiveOutputsInternal(
input.getPipeline(),
TupleTagList.of(mainOutputTag).and(additionalOutputTags.getAll()),
- // TODO
- Collections.<TupleTag<?>, Coder<?>>emptyMap(),
+ outputTagsToCoders,
windowingStrategy,
input.isBounded().and(signature.isBoundedPerElement()));
http://git-wip-us.apache.org/repos/asf/beam/blob/48690bc6/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java
index 267232c..05c471d 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals;
import java.io.Serializable;
import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
@@ -29,6 +30,7 @@ import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.junit.Rule;
@@ -106,12 +108,18 @@ public class SplittableParDoTest {
private static final TupleTag<String> MAIN_OUTPUT_TAG = new TupleTag<String>() {};
- private ParDo.MultiOutput<Integer, String> makeParDo(DoFn<Integer, String> fn) {
- return ParDo.of(fn).withOutputTags(MAIN_OUTPUT_TAG, TupleTagList.empty());
+ private PCollection<String> applySplittableParDo(
+ String name, PCollection<Integer> input, DoFn<Integer, String> fn) {
+ ParDo.MultiOutput<Integer, String> multiOutput =
+ ParDo.of(fn).withOutputTags(MAIN_OUTPUT_TAG, TupleTagList.empty());
+ PCollectionTuple output = multiOutput.expand(input);
+ output.get(MAIN_OUTPUT_TAG).setName("main");
+ AppliedPTransform<PCollection<Integer>, PCollectionTuple, ?> transform =
+ AppliedPTransform.of("ParDo", input.expand(), output.expand(), multiOutput, pipeline);
+ return input.apply(name, SplittableParDo.forAppliedParDo(transform)).get(MAIN_OUTPUT_TAG);
}
- @Rule
- public TestPipeline pipeline = TestPipeline.create();
+ @Rule public TestPipeline pipeline = TestPipeline.create();
@Test
public void testBoundednessForBoundedFn() {
@@ -121,16 +129,12 @@ public class SplittableParDoTest {
assertEquals(
"Applying a bounded SDF to a bounded collection produces a bounded collection",
PCollection.IsBounded.BOUNDED,
- makeBoundedCollection(pipeline)
- .apply("bounded to bounded", SplittableParDo.forJavaParDo(makeParDo(boundedFn)))
- .get(MAIN_OUTPUT_TAG)
+ applySplittableParDo("bounded to bounded", makeBoundedCollection(pipeline), boundedFn)
.isBounded());
assertEquals(
"Applying a bounded SDF to an unbounded collection produces an unbounded collection",
PCollection.IsBounded.UNBOUNDED,
- makeUnboundedCollection(pipeline)
- .apply("bounded to unbounded", SplittableParDo.forJavaParDo(makeParDo(boundedFn)))
- .get(MAIN_OUTPUT_TAG)
+ applySplittableParDo("bounded to unbounded", makeUnboundedCollection(pipeline), boundedFn)
.isBounded());
}
@@ -142,16 +146,13 @@ public class SplittableParDoTest {
assertEquals(
"Applying an unbounded SDF to a bounded collection produces a bounded collection",
PCollection.IsBounded.UNBOUNDED,
- makeBoundedCollection(pipeline)
- .apply("unbounded to bounded", SplittableParDo.forJavaParDo(makeParDo(unboundedFn)))
- .get(MAIN_OUTPUT_TAG)
+ applySplittableParDo("unbounded to bounded", makeBoundedCollection(pipeline), unboundedFn)
.isBounded());
assertEquals(
"Applying an unbounded SDF to an unbounded collection produces an unbounded collection",
PCollection.IsBounded.UNBOUNDED,
- makeUnboundedCollection(pipeline)
- .apply("unbounded to unbounded", SplittableParDo.forJavaParDo(makeParDo(unboundedFn)))
- .get(MAIN_OUTPUT_TAG)
+ applySplittableParDo(
+ "unbounded to unbounded", makeUnboundedCollection(pipeline), unboundedFn)
.isBounded());
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/48690bc6/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
index af720fd..251260e 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
@@ -184,6 +184,7 @@ public class SplittableParDoViaKeyedWorkItems {
original.getFn(),
original.getMainOutputTag(),
original.getAdditionalOutputTags(),
+ original.getOutputTagsToCoders(),
original.getInputWindowingStrategy());
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/48690bc6/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
index 3f04b56..26f30b0 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
@@ -96,7 +96,7 @@ class ParDoMultiOverrideFactory<InputT, OutputT>
DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass());
if (signature.processElement().isSplittable()) {
- return (PTransform) SplittableParDo.forAppliedParDo(application);
+ return SplittableParDo.forAppliedParDo((AppliedPTransform) application);
} else if (signature.stateDeclarations().size() > 0
|| signature.timerDeclarations().size() > 0) {
return new GbkThenStatefulParDo(
http://git-wip-us.apache.org/repos/asf/beam/blob/48690bc6/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/SplittableParDoOverrides.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/SplittableParDoOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/SplittableParDoOverrides.java
index fc010f8..7b65950 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/SplittableParDoOverrides.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/SplittableParDoOverrides.java
@@ -64,7 +64,7 @@ class SplittableParDoOverrides {
appliedTransform) {
return PTransformReplacement.of(
PTransformReplacements.getSingletonMainInput(appliedTransform),
- SplittableParDo.forJavaParDo(appliedTransform.getTransform()));
+ SplittableParDo.forAppliedParDo(appliedTransform));
}
@Override