You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/10/17 21:08:16 UTC
[08/14] beam git commit: Add custom rehydration for ParDo
Add custom rehydration for ParDo
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7fb3e793
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7fb3e793
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7fb3e793
Branch: refs/heads/master
Commit: 7fb3e79328e1a9ef8340170aecd44c89e596eec5
Parents: 92209c3
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Oct 3 19:17:48 2017 -0700
Committer: Kenneth Knowles <ke...@apache.org>
Committed: Tue Oct 17 12:45:11 2017 -0700
----------------------------------------------------------------------
.../core/construction/ParDoTranslation.java | 226 ++++++++++++++++---
.../core/construction/PipelineTranslation.java | 22 --
2 files changed, 194 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/7fb3e793/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
index 5092448..f88cbe5 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
@@ -26,6 +26,7 @@ import static org.apache.beam.runners.core.construction.PTransformTranslation.PA
import com.google.auto.service.AutoService;
import com.google.auto.value.AutoValue;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.MoreObjects;
import com.google.common.base.Optional;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
@@ -35,6 +36,7 @@ import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -75,6 +77,7 @@ import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.WindowingStrategy;
@@ -91,8 +94,7 @@ public class ParDoTranslation {
/** A {@link TransformPayloadTranslator} for {@link ParDo}. */
public static class ParDoPayloadTranslator
- extends PTransformTranslation.TransformPayloadTranslator.WithDefaultRehydration<
- ParDo.MultiOutput<?, ?>> {
+ implements TransformPayloadTranslator<MultiOutput<?, ?>> {
public static TransformPayloadTranslator create() {
return new ParDoPayloadTranslator();
}
@@ -115,6 +117,13 @@ public class ParDoTranslation {
.build();
}
+ @Override
+ public PTransformTranslation.RawPTransform<?, ?> rehydrate(
+ RunnerApi.PTransform protoTransform, RehydratedComponents rehydratedComponents)
+ throws IOException {
+ return new RawParDo<>(protoTransform, rehydratedComponents);
+ }
+
/** Registers {@link ParDoPayloadTranslator}. */
@AutoService(TransformPayloadTranslatorRegistrar.class)
public static class Registrar implements TransformPayloadTranslatorRegistrar {
@@ -125,41 +134,76 @@ public class ParDoTranslation {
}
@Override
- public Map<String, TransformPayloadTranslator> getTransformRehydrators() {
- return Collections.emptyMap();
+ public Map<String, ? extends TransformPayloadTranslator> getTransformRehydrators() {
+ return Collections.singletonMap(PAR_DO_TRANSFORM_URN, new ParDoPayloadTranslator());
}
}
}
- public static ParDoPayload toProto(ParDo.MultiOutput<?, ?> parDo, SdkComponents components)
+ public static ParDoPayload toProto(final ParDo.MultiOutput<?, ?> parDo, SdkComponents components)
throws IOException {
- DoFn<?, ?> doFn = parDo.getFn();
- DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
- Map<String, StateDeclaration> states = signature.stateDeclarations();
- Map<String, TimerDeclaration> timers = signature.timerDeclarations();
- List<Parameter> parameters = signature.processElement().extraParameters();
-
- ParDoPayload.Builder builder = ParDoPayload.newBuilder();
- builder.setDoFn(toProto(parDo.getFn(), parDo.getMainOutputTag()));
- builder.setSplittable(signature.processElement().isSplittable());
- for (PCollectionView<?> sideInput : parDo.getSideInputs()) {
- builder.putSideInputs(sideInput.getTagInternal().getId(), toProto(sideInput));
- }
- for (Parameter parameter : parameters) {
- Optional<RunnerApi.Parameter> protoParameter = toProto(parameter);
- if (protoParameter.isPresent()) {
- builder.addParameters(protoParameter.get());
- }
- }
- for (Map.Entry<String, StateDeclaration> state : states.entrySet()) {
- RunnerApi.StateSpec spec = toProto(getStateSpecOrCrash(state.getValue(), doFn), components);
- builder.putStateSpecs(state.getKey(), spec);
- }
- for (Map.Entry<String, TimerDeclaration> timer : timers.entrySet()) {
- RunnerApi.TimerSpec spec = toProto(getTimerSpecOrCrash(timer.getValue(), doFn));
- builder.putTimerSpecs(timer.getKey(), spec);
- }
- return builder.build();
+
+ final DoFn<?, ?> doFn = parDo.getFn();
+ final DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
+
+ return payloadForParDoLike(
+ new ParDoLike() {
+ @Override
+ public SdkFunctionSpec translateDoFn(SdkComponents newComponents) {
+ return toProto(parDo.getFn(), parDo.getMainOutputTag());
+ }
+
+ @Override
+ public List<RunnerApi.Parameter> translateParameters() {
+ List<RunnerApi.Parameter> parameters = new ArrayList<>();
+ for (Parameter parameter : signature.processElement().extraParameters()) {
+ Optional<RunnerApi.Parameter> protoParameter = toProto(parameter);
+ if (protoParameter.isPresent()) {
+ parameters.add(protoParameter.get());
+ }
+ }
+ return parameters;
+ }
+
+ @Override
+ public Map<String, SideInput> translateSideInputs(SdkComponents components) {
+ Map<String, SideInput> sideInputs = new HashMap<>();
+ for (PCollectionView<?> sideInput : parDo.getSideInputs()) {
+ sideInputs.put(sideInput.getTagInternal().getId(), toProto(sideInput));
+ }
+ return sideInputs;
+ }
+
+ @Override
+ public Map<String, RunnerApi.StateSpec> translateStateSpecs(SdkComponents components)
+ throws IOException {
+ Map<String, RunnerApi.StateSpec> stateSpecs = new HashMap<>();
+ for (Map.Entry<String, StateDeclaration> state :
+ signature.stateDeclarations().entrySet()) {
+ RunnerApi.StateSpec spec =
+ toProto(getStateSpecOrCrash(state.getValue(), doFn), components);
+ stateSpecs.put(state.getKey(), spec);
+ }
+ return stateSpecs;
+ }
+
+ @Override
+ public Map<String, RunnerApi.TimerSpec> translateTimerSpecs(SdkComponents newComponents) {
+ Map<String, RunnerApi.TimerSpec> timerSpecs = new HashMap<>();
+ for (Map.Entry<String, TimerDeclaration> timer :
+ signature.timerDeclarations().entrySet()) {
+ RunnerApi.TimerSpec spec = toProto(getTimerSpecOrCrash(timer.getValue(), doFn));
+ timerSpecs.put(timer.getKey(), spec);
+ }
+ return timerSpecs;
+ }
+
+ @Override
+ public boolean isSplittable() {
+ return signature.processElement().isSplittable();
+ }
+ },
+ components);
}
private static StateSpec<?> getStateSpecOrCrash(
@@ -603,4 +647,122 @@ public class ParDoTranslation {
SerializableUtils.deserializeFromByteArray(
spec.getPayload().toByteArray(), "Custom WinodwMappingFn");
}
+
+ static class RawParDo<InputT, OutputT>
+ extends PTransformTranslation.RawPTransform<PCollection<InputT>, PCollection<OutputT>>
+ implements ParDoLike {
+
+ private final RunnerApi.PTransform protoTransform;
+ private final transient RehydratedComponents rehydratedComponents;
+
+ // Parsed from protoTransform and cached
+ private final FunctionSpec spec;
+ private final ParDoPayload payload;
+
+ public RawParDo(RunnerApi.PTransform protoTransform, RehydratedComponents rehydratedComponents)
+ throws IOException {
+ this.rehydratedComponents = rehydratedComponents;
+ this.protoTransform = protoTransform;
+ this.spec = protoTransform.getSpec();
+ this.payload = ParDoPayload.parseFrom(spec.getPayload());
+ }
+
+ @Override
+ public FunctionSpec getSpec() {
+ return spec;
+ }
+
+ @Override
+ public FunctionSpec migrate(SdkComponents components) throws IOException {
+ return FunctionSpec.newBuilder()
+ .setUrn(PAR_DO_TRANSFORM_URN)
+ .setPayload(payloadForParDoLike(this, components).toByteString())
+ .build();
+ }
+
+ @Override
+ public Map<TupleTag<?>, PValue> getAdditionalInputs() {
+ Map<TupleTag<?>, PValue> additionalInputs = new HashMap<>();
+ for (Map.Entry<String, SideInput> sideInputEntry : payload.getSideInputsMap().entrySet()) {
+ try {
+ additionalInputs.put(
+ new TupleTag<>(sideInputEntry.getKey()),
+ rehydratedComponents.getPCollection(
+ protoTransform.getInputsOrThrow(sideInputEntry.getKey())));
+ } catch (IOException exc) {
+ throw new IllegalStateException(
+ String.format(
+ "Could not find input with name %s for %s transform",
+ sideInputEntry.getKey(), ParDo.class.getSimpleName()));
+ }
+ }
+ return additionalInputs;
+ }
+
+ @Override
+ public SdkFunctionSpec translateDoFn(SdkComponents newComponents) {
+ // TODO: re-register the environment with the new components
+ return payload.getDoFn();
+ }
+
+ @Override
+ public List<RunnerApi.Parameter> translateParameters() {
+ return MoreObjects.firstNonNull(
+ payload.getParametersList(), Collections.<RunnerApi.Parameter>emptyList());
+ }
+
+ @Override
+ public Map<String, SideInput> translateSideInputs(SdkComponents components) {
+ // TODO: re-register the PCollections and UDF environments
+ return MoreObjects.firstNonNull(
+ payload.getSideInputsMap(), Collections.<String, SideInput>emptyMap());
+ }
+
+ @Override
+ public Map<String, RunnerApi.StateSpec> translateStateSpecs(SdkComponents components) {
+ // TODO: re-register the coders
+ return MoreObjects.firstNonNull(
+ payload.getStateSpecsMap(), Collections.<String, RunnerApi.StateSpec>emptyMap());
+ }
+
+ @Override
+ public Map<String, RunnerApi.TimerSpec> translateTimerSpecs(SdkComponents newComponents) {
+ return MoreObjects.firstNonNull(
+ payload.getTimerSpecsMap(), Collections.<String, RunnerApi.TimerSpec>emptyMap());
+ }
+
+ @Override
+ public boolean isSplittable() {
+ return payload.getSplittable();
+ }
+ }
+
+ /** These methods drive to-proto translation from Java and from rehydrated ParDos. */
+ private interface ParDoLike {
+ SdkFunctionSpec translateDoFn(SdkComponents newComponents);
+
+ List<RunnerApi.Parameter> translateParameters();
+
+ Map<String, RunnerApi.SideInput> translateSideInputs(SdkComponents components);
+
+ Map<String, RunnerApi.StateSpec> translateStateSpecs(SdkComponents components)
+ throws IOException;
+
+ Map<String, RunnerApi.TimerSpec> translateTimerSpecs(SdkComponents newComponents);
+
+ boolean isSplittable();
+ }
+
+ public static ParDoPayload payloadForParDoLike(ParDoLike parDo, SdkComponents components)
+ throws IOException {
+
+ return ParDoPayload.newBuilder()
+ .setDoFn(parDo.translateDoFn(components))
+ .addAllParameters(parDo.translateParameters())
+ .putAllStateSpecs(parDo.translateStateSpecs(components))
+ .putAllTimerSpecs(parDo.translateTimerSpecs(components))
+ .putAllSideInputs(parDo.translateSideInputs(components))
+ .setSplittable(parDo.isSplittable())
+ .build();
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7fb3e793/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
index 85033e5..c8d38eb 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
@@ -141,31 +141,9 @@ public class PipelineTranslation {
rehydratedComponents.getPCollection(outputEntry.getValue()));
}
- RunnerApi.FunctionSpec transformSpec = transformProto.getSpec();
RawPTransform<?, ?> transform =
PTransformTranslation.rehydrate(transformProto, rehydratedComponents);
- // By default, no "additional" inputs, since that is an SDK-specific thing.
- // Only ParDo and WriteFiles really separate main from side inputs
- Map<TupleTag<?>, PValue> additionalInputs = Collections.emptyMap();
-
- // TODO: ParDoTranslation should own it - https://issues.apache.org/jira/browse/BEAM-2674
- if (transformSpec.getUrn().equals(PTransformTranslation.PAR_DO_TRANSFORM_URN)) {
- RunnerApi.ParDoPayload payload = RunnerApi.ParDoPayload.parseFrom(transformSpec.getPayload());
- additionalInputs =
- sideInputMapToAdditionalInputs(
- transformProto, rehydratedComponents, rehydratedInputs, payload.getSideInputsMap());
- }
-
- // TODO: WriteFilesTranslation should own it - https://issues.apache.org/jira/browse/BEAM-2674
- if (transformSpec.getUrn().equals(PTransformTranslation.WRITE_FILES_TRANSFORM_URN)) {
- RunnerApi.WriteFilesPayload payload =
- RunnerApi.WriteFilesPayload.parseFrom(transformSpec.getPayload());
- additionalInputs =
- sideInputMapToAdditionalInputs(
- transformProto, rehydratedComponents, rehydratedInputs, payload.getSideInputsMap());
- }
-
if (isPrimitive(transformProto)) {
transforms.addFinalizedPrimitiveNode(
transformProto.getUniqueName(), rehydratedInputs, transform, rehydratedOutputs);