You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/06/06 20:28:11 UTC
[2/3] beam git commit: Improve Splittable ParDo translation
Improve Splittable ParDo translation
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1b00d95a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1b00d95a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1b00d95a
Branch: refs/heads/master
Commit: 1b00d95a1105d2611b985dc463da0884a6646354
Parents: 840492d
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu May 25 06:29:16 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Jun 6 13:13:12 2017 -0700
----------------------------------------------------------------------
.../core/construction/ParDoTranslation.java | 20 +++++++++++
.../core/construction/SplittableParDo.java | 18 ++++++++--
.../core/construction/ParDoTranslationTest.java | 35 +++++++++++++++++++-
.../core/SplittableParDoViaKeyedWorkItems.java | 10 +++++-
.../direct/KeyedPValueTrackingVisitor.java | 2 +-
.../src/main/proto/beam_runner_api.proto | 3 ++
6 files changed, 82 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/1b00d95a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
index fe66179..34e0d86 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
@@ -144,6 +144,7 @@ public class ParDoTranslation {
ParDoPayload.Builder builder = ParDoPayload.newBuilder();
builder.setDoFn(toProto(parDo.getFn(), parDo.getMainOutputTag()));
+ builder.setSplittable(signature.processElement().isSplittable());
for (PCollectionView<?> sideInput : parDo.getSideInputs()) {
builder.putSideInputs(sideInput.getTagInternal().getId(), toProto(sideInput));
}
@@ -496,6 +497,25 @@ public class ParDoTranslation {
.build();
}
+ private static <T> ParDoPayload getParDoPayload(AppliedPTransform<?, ?, ?> transform)
+ throws IOException {
+ return PTransformTranslation.toProto(
+ transform, Collections.<AppliedPTransform<?, ?, ?>>emptyList(), SdkComponents.create())
+ .getSpec()
+ .getParameter()
+ .unpack(ParDoPayload.class);
+ }
+
+ public static boolean usesStateOrTimers(AppliedPTransform<?, ?, ?> transform) throws IOException {
+ ParDoPayload payload = getParDoPayload(transform);
+ return payload.getStateSpecsCount() > 0 || payload.getTimerSpecsCount() > 0;
+ }
+
+ public static boolean isSplittable(AppliedPTransform<?, ?, ?> transform) throws IOException {
+ ParDoPayload payload = getParDoPayload(transform);
+ return payload.getSplittable();
+ }
+
private static ViewFn<?, ?> viewFnFromProto(SdkFunctionSpec viewFn)
throws InvalidProtocolBufferException {
FunctionSpec spec = viewFn.getSpec();
http://git-wip-us.apache.org/repos/asf/beam/blob/1b00d95a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
index dfca7d2..665e39d 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
@@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
import java.util.List;
import java.util.UUID;
+import org.apache.beam.runners.core.construction.PTransformTranslation.RawPTransform;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.DoFn;
@@ -67,6 +68,12 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
public static final String SPLITTABLE_PROCESS_URN =
"urn:beam:runners_core:transforms:splittable_process:v1";
+ public static final String SPLITTABLE_PROCESS_KEYED_ELEMENTS_URN =
+ "urn:beam:runners_core:transforms:splittable_process_keyed_elements:v1";
+
+ public static final String SPLITTABLE_GBKIKWI_URN =
+ "urn:beam:runners_core:transforms:splittable_gbkikwi:v1";
+
/**
* Creates the transform for the given original multi-output {@link ParDo}.
*
@@ -133,11 +140,11 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
/**
* Runner-specific primitive {@link PTransform} that invokes the {@link DoFn.ProcessElement}
- * method for a splittable {@link DoFn} on each {@link ElementAndRestriction} of the input
- * {@link PCollection} of {@link KV KVs} keyed with arbitrary but globally unique keys.
+ * method for a splittable {@link DoFn} on each {@link ElementAndRestriction} of the input {@link
+ * PCollection} of {@link KV KVs} keyed with arbitrary but globally unique keys.
*/
public static class ProcessKeyedElements<InputT, OutputT, RestrictionT>
- extends PTransform<
+ extends RawPTransform<
PCollection<KV<String, ElementAndRestriction<InputT, RestrictionT>>>, PCollectionTuple> {
private final DoFn<InputT, OutputT> fn;
private final Coder<InputT> elementCoder;
@@ -227,6 +234,11 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
return outputs;
}
+
+ @Override
+ public String getUrn() {
+ return SPLITTABLE_PROCESS_KEYED_ELEMENTS_URN;
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/1b00d95a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java
index 46f6a80..a8490bf 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java
@@ -51,6 +51,7 @@ import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.ParDo.MultiOutput;
import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
@@ -111,7 +112,11 @@ public class ParDoTranslationTest {
ParDo.of(new DropElementsFn())
.withOutputTags(
new TupleTag<Void>(),
- TupleTagList.of(new TupleTag<byte[]>() {}).and(new TupleTag<Integer>() {})));
+ TupleTagList.of(new TupleTag<byte[]>() {}).and(new TupleTag<Integer>() {})),
+ ParDo.of(new SplittableDropElementsFn())
+ .withOutputTags(
+ new TupleTag<Void>(),
+ TupleTagList.empty()));
}
@Parameter(0)
@@ -235,6 +240,34 @@ public class ParDoTranslationTest {
}
}
+ private static class SplittableDropElementsFn extends DoFn<KV<Long, String>, Void> {
+ @ProcessElement
+ public void proc(ProcessContext context, RestrictionTracker<Integer> restriction) {
+ context.output(null);
+ }
+
+ @GetInitialRestriction
+ public Integer restriction(KV<Long, String> elem) {
+ return 42;
+ }
+
+ @NewTracker
+ public RestrictionTracker<Integer> newTracker(Integer restriction) {
+ throw new UnsupportedOperationException("Should never be called; only to test translation");
+ }
+
+
+ @Override
+ public boolean equals(Object other) {
+ return other instanceof SplittableDropElementsFn;
+ }
+
+ @Override
+ public int hashCode() {
+ return SplittableDropElementsFn.class.hashCode();
+ }
+ }
+
@SuppressWarnings("unused")
private static class StateTimerDropElementsFn extends DoFn<KV<Long, String>, Void> {
private static final String BAG_STATE_ID = "bagState";
http://git-wip-us.apache.org/repos/asf/beam/blob/1b00d95a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
index b38e364..c4b086a 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
@@ -23,7 +23,9 @@ import java.util.List;
import java.util.Map;
import org.apache.beam.runners.core.construction.ElementAndRestriction;
import org.apache.beam.runners.core.construction.PTransformReplacements;
+import org.apache.beam.runners.core.construction.PTransformTranslation.RawPTransform;
import org.apache.beam.runners.core.construction.ReplacementOutputs;
+import org.apache.beam.runners.core.construction.SplittableParDo;
import org.apache.beam.runners.core.construction.SplittableParDo.ProcessKeyedElements;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
@@ -67,12 +69,18 @@ public class SplittableParDoViaKeyedWorkItems {
* emits output immediately.
*/
public static class GBKIntoKeyedWorkItems<KeyT, InputT>
- extends PTransform<PCollection<KV<KeyT, InputT>>, PCollection<KeyedWorkItem<KeyT, InputT>>> {
+ extends RawPTransform<
+ PCollection<KV<KeyT, InputT>>, PCollection<KeyedWorkItem<KeyT, InputT>>> {
@Override
public PCollection<KeyedWorkItem<KeyT, InputT>> expand(PCollection<KV<KeyT, InputT>> input) {
return PCollection.createPrimitiveOutputInternal(
input.getPipeline(), WindowingStrategy.globalDefault(), input.isBounded());
}
+
+ @Override
+ public String getUrn() {
+ return SplittableParDo.SPLITTABLE_GBKIKWI_URN;
+ }
}
/** Overrides a {@link ProcessKeyedElements} into {@link SplittableProcessViaKeyedWorkItems}. */
http://git-wip-us.apache.org/repos/asf/beam/blob/1b00d95a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java
index f9b2dae..6eadaba 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java
@@ -47,7 +47,7 @@ import org.apache.beam.sdk.values.TupleTag;
class KeyedPValueTrackingVisitor extends PipelineVisitor.Defaults {
private static final Set<Class<? extends PTransform>> PRODUCES_KEYED_OUTPUTS =
- ImmutableSet.of(
+ ImmutableSet.<Class<? extends PTransform>>of(
SplittableParDoViaKeyedWorkItems.GBKIntoKeyedWorkItems.class,
DirectGroupByKeyOnly.class,
DirectGroupAlsoByWindow.class);
http://git-wip-us.apache.org/repos/asf/beam/blob/1b00d95a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
----------------------------------------------------------------------
diff --git a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
index 87e33f0..039ecb0 100644
--- a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
+++ b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
@@ -220,6 +220,9 @@ message ParDoPayload {
// (Optional) A mapping of local timer names to timer specifications.
map<string, TimerSpec> timer_specs = 5;
+
+ // Whether the DoFn is splittable
+ bool splittable = 6;
}
// Parameters that a UDF might require.