You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/07/13 03:06:38 UTC
[26/50] [abbrv] beam git commit: Enable SplittableParDo on rehydrated
ParDo transform
Enable SplittableParDo on rehydrated ParDo transform
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e5ca058b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e5ca058b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e5ca058b
Branch: refs/heads/DSL_SQL
Commit: e5ca058bd7ad5f2150fef3e57649bcfb487a711f
Parents: bdece9d
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Jun 8 14:27:02 2017 -0700
Committer: Tyler Akidau <ta...@apache.org>
Committed: Wed Jul 12 20:01:01 2017 -0700
----------------------------------------------------------------------
.../core/construction/SplittableParDo.java | 25 ++++++++++++++
.../direct/ParDoMultiOverrideFactory.java | 36 ++++++++++++++------
.../flink/FlinkStreamingPipelineTranslator.java | 2 +-
3 files changed, 52 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/e5ca058b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
index f31b495..e71187b 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.core.construction;
import static com.google.common.base.Preconditions.checkArgument;
+import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -26,6 +27,7 @@ import org.apache.beam.runners.core.construction.PTransformTranslation.RawPTrans
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
@@ -103,6 +105,9 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
public static <InputT, OutputT> SplittableParDo<InputT, OutputT, ?> forJavaParDo(
ParDo.MultiOutput<InputT, OutputT> parDo) {
checkArgument(parDo != null, "parDo must not be null");
+ checkArgument(
+ DoFnSignatures.getSignature(parDo.getFn().getClass()).processElement().isSplittable(),
+ "fn must be a splittable DoFn");
return new SplittableParDo(
parDo.getFn(),
parDo.getMainOutputTag(),
@@ -110,6 +115,26 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
parDo.getAdditionalOutputTags());
}
+ /**
+ * Creates the transform for a {@link ParDo}-compatible {@link AppliedPTransform}.
+ *
+ * <p>The input may generally be a deserialized transform so it may not actually be a {@link
+ * ParDo}. Instead {@link ParDoTranslation} will be used to extract fields.
+ */
+ public static SplittableParDo<?, ?, ?> forAppliedParDo(AppliedPTransform<?, ?, ?> parDo) {
+ checkArgument(parDo != null, "parDo must not be null");
+
+ try {
+ return new SplittableParDo<>(
+ ParDoTranslation.getDoFn(parDo),
+ (TupleTag) ParDoTranslation.getMainOutputTag(parDo),
+ ParDoTranslation.getSideInputs(parDo),
+ ParDoTranslation.getAdditionalOutputTags(parDo));
+ } catch (IOException exc) {
+ throw new RuntimeException(exc);
+ }
+ }
+
@Override
public PCollectionTuple expand(PCollection<InputT> input) {
Coder<RestrictionT> restrictionCoder =
http://git-wip-us.apache.org/repos/asf/beam/blob/e5ca058b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
index 2904bc1..8881967 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.direct;
import static com.google.common.base.Preconditions.checkState;
+import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.apache.beam.runners.core.KeyedWorkItem;
@@ -26,6 +27,7 @@ import org.apache.beam.runners.core.KeyedWorkItemCoder;
import org.apache.beam.runners.core.KeyedWorkItems;
import org.apache.beam.runners.core.construction.PTransformReplacements;
import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.ParDoTranslation;
import org.apache.beam.runners.core.construction.ReplacementOutputs;
import org.apache.beam.runners.core.construction.SplittableParDo;
import org.apache.beam.sdk.coders.Coder;
@@ -62,29 +64,43 @@ import org.apache.beam.sdk.values.WindowingStrategy;
*/
class ParDoMultiOverrideFactory<InputT, OutputT>
implements PTransformOverrideFactory<
- PCollection<? extends InputT>, PCollectionTuple, MultiOutput<InputT, OutputT>> {
+ PCollection<? extends InputT>, PCollectionTuple,
+ PTransform<PCollection<? extends InputT>, PCollectionTuple>> {
@Override
public PTransformReplacement<PCollection<? extends InputT>, PCollectionTuple>
getReplacementTransform(
AppliedPTransform<
- PCollection<? extends InputT>, PCollectionTuple, MultiOutput<InputT, OutputT>>
- transform) {
+ PCollection<? extends InputT>, PCollectionTuple,
+ PTransform<PCollection<? extends InputT>, PCollectionTuple>>
+ application) {
return PTransformReplacement.of(
- PTransformReplacements.getSingletonMainInput(transform),
- getReplacementTransform(transform.getTransform()));
+ PTransformReplacements.getSingletonMainInput(application),
+ getReplacementForApplication(application));
}
@SuppressWarnings("unchecked")
- private PTransform<PCollection<? extends InputT>, PCollectionTuple> getReplacementTransform(
- MultiOutput<InputT, OutputT> transform) {
+ private PTransform<PCollection<? extends InputT>, PCollectionTuple> getReplacementForApplication(
+ AppliedPTransform<
+ PCollection<? extends InputT>, PCollectionTuple,
+ PTransform<PCollection<? extends InputT>, PCollectionTuple>>
+ application) {
+
+ DoFn<InputT, OutputT> fn;
+ try {
+ fn = (DoFn<InputT, OutputT>) ParDoTranslation.getDoFn(application);
+ } catch (IOException exc) {
+ throw new RuntimeException(exc);
+ }
- DoFn<InputT, OutputT> fn = transform.getFn();
DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass());
if (signature.processElement().isSplittable()) {
- return (PTransform) SplittableParDo.forJavaParDo(transform);
+ return (PTransform) SplittableParDo.forAppliedParDo(application);
} else if (signature.stateDeclarations().size() > 0
|| signature.timerDeclarations().size() > 0) {
+ MultiOutput<InputT, OutputT> transform =
+ (MultiOutput<InputT, OutputT>) application.getTransform();
+
// Based on the fact that the signature is stateful, DoFnSignatures ensures
// that it is also keyed
return new GbkThenStatefulParDo(
@@ -93,7 +109,7 @@ class ParDoMultiOverrideFactory<InputT, OutputT>
transform.getAdditionalOutputTags(),
transform.getSideInputs());
} else {
- return transform;
+ return application.getTransform();
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e5ca058b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
index ebc9345..f733e2e 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
@@ -188,7 +188,7 @@ class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator {
transform) {
return PTransformReplacement.of(
PTransformReplacements.getSingletonMainInput(transform),
- SplittableParDo.forJavaParDo(transform.getTransform()));
+ (SplittableParDo<InputT, OutputT, ?>) SplittableParDo.forAppliedParDo(transform));
}
@Override