You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/10/17 21:08:12 UTC
[04/14] beam git commit: Add RawPTransform.migrate(SdkComponents) for
re-serialization
Add RawPTransform.migrate(SdkComponents) for re-serialization
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c14455ef
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c14455ef
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c14455ef
Branch: refs/heads/master
Commit: c14455ef4209a14a62f7b18f604c9673b32d06d7
Parents: 020ef14
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Oct 2 20:28:16 2017 -0700
Committer: Kenneth Knowles <ke...@apache.org>
Committed: Tue Oct 17 12:45:11 2017 -0700
----------------------------------------------------------------------
.../construction/PTransformTranslation.java | 98 +++++++++-----------
.../core/construction/PipelineTranslation.java | 36 ++++---
.../core/construction/SplittableParDo.java | 8 ++
.../core/SplittableParDoViaKeyedWorkItems.java | 7 ++
.../beam/runners/direct/DirectGroupByKey.java | 14 +++
.../beam/runners/direct/MultiStepCombine.java | 18 +++-
.../direct/ParDoMultiOverrideFactory.java | 7 ++
.../direct/TestStreamEvaluatorFactory.java | 7 ++
.../runners/direct/ViewOverrideFactory.java | 8 ++
9 files changed, 129 insertions(+), 74 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/c14455ef/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
index f9e7837..31767a0 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
@@ -23,7 +23,6 @@ import static com.google.common.base.Preconditions.checkArgument;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
-import com.google.protobuf.ByteString;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
@@ -66,8 +65,8 @@ public class PTransformTranslation {
public static final String WRITE_FILES_TRANSFORM_URN = "urn:beam:transform:write_files:0.1";
/**
- * @deprecated runners should move away from translating `CreatePCollectionView` and treat this
- * as part of the translation for a `ParDo` side input.
+ * @deprecated runners should move away from translating `CreatePCollectionView` and treat this as
+ * part of the translation for a `ParDo` side input.
*/
@Deprecated
public static final String CREATE_VIEW_TRANSFORM_URN = "urn:beam:transform:create_view:v1";
@@ -85,8 +84,8 @@ public class PTransformTranslation {
Map<Class<? extends PTransform>, TransformPayloadTranslator> newTranslators =
(Map) registrar.getTransformPayloadTranslators();
- Set<Class<? extends PTransform>> alreadyRegistered = Sets.intersection(
- translators.keySet(), newTranslators.keySet());
+ Set<Class<? extends PTransform>> alreadyRegistered =
+ Sets.intersection(translators.keySet(), newTranslators.keySet());
if (!alreadyRegistered.isEmpty()) {
throw new IllegalArgumentException(
@@ -143,20 +142,13 @@ public class PTransformTranslation {
DisplayDataTranslation.toProto(DisplayData.from(appliedPTransform.getTransform())));
PTransform<?, ?> transform = appliedPTransform.getTransform();
+
// A RawPTransform directly vends its payload. Because it will generally be
// a subclass, we cannot do dictionary lookup in KNOWN_PAYLOAD_TRANSLATORS.
if (transform instanceof RawPTransform) {
- RawPTransform<?, ?> rawPTransform = (RawPTransform<?, ?>) transform;
-
- if (rawPTransform.getUrn() != null) {
- FunctionSpec.Builder payload = FunctionSpec.newBuilder().setUrn(rawPTransform.getUrn());
- @Nullable ByteString parameter = rawPTransform.getPayload();
- if (parameter != null) {
- payload.setPayload(parameter);
- }
- transformBuilder.setSpec(payload);
- }
- rawPTransform.registerComponents(components);
+ // The raw transform was parsed in the context of other components; this puts it in the
+ // context of our current serialization
+ transformBuilder.setSpec(((RawPTransform<?, ?>) transform).migrate(components));
} else if (KNOWN_PAYLOAD_TRANSLATORS.containsKey(transform.getClass())) {
FunctionSpec payload =
KNOWN_PAYLOAD_TRANSLATORS
@@ -186,9 +178,7 @@ public class PTransformTranslation {
return tag.getId();
}
- /**
- * Returns the URN for the transform if it is known, otherwise {@code null}.
- */
+ /** Returns the URN for the transform if it is known, otherwise {@code null}. */
@Nullable
public static String urnForTransformOrNull(PTransform<?, ?> transform) {
@@ -205,9 +195,7 @@ public class PTransformTranslation {
return translator.getUrn(transform);
}
- /**
- * Returns the URN for the transform if it is known, otherwise throws.
- */
+ /** Returns the URN for the transform if it is known, otherwise throws. */
public static String urnForTransform(PTransform<?, ?> transform) {
String urn = urnForTransformOrNull(transform);
if (urn == null) {
@@ -235,24 +223,48 @@ public class PTransformTranslation {
* #expand} method since the definition of the transform may be lost. The transform is already
* fully expanded in the pipeline proto.
*/
- public abstract static class RawPTransform<
- InputT extends PInput, OutputT extends POutput>
+ public abstract static class RawPTransform<InputT extends PInput, OutputT extends POutput>
extends PTransform<InputT, OutputT> {
+ /** The URN for this transform, if standardized. */
@Nullable
- public abstract String getUrn();
+ public String getUrn() {
+ return getSpec() == null ? null : getSpec().getUrn();
+ }
+ /** The payload for this transform, if any. */
@Nullable
- public ByteString getPayload() {
- return null;
+ public abstract FunctionSpec getSpec();
+
+ /**
+ * Build a new payload set in the context of the given {@link SdkComponents}, if applicable.
+ *
+ * <p>When re-serializing this transform, the ids reference in the rehydrated payload may
+ * conflict with those defined by the serialization context. In that case, the components must
+ * be re-registered and a new payload returned.
+ */
+ public FunctionSpec migrate(SdkComponents components) throws IOException {
+ return getSpec();
}
- public void registerComponents(SdkComponents components) {}
+ /**
+ * By default, throws an exception, but can be overridden.
+ *
+ * <p>It is permissible for runner-specific transforms to be both a {@link RawPTransform} that
+ * directly vends its proto representation and also to expand, for convenience of not having to
+ * register a translator.
+ */
+ @Override
+ public OutputT expand(InputT input) {
+ throw new IllegalStateException(
+ String.format(
+ "%s should never be asked to expand;"
+ + " it is the result of deserializing an already-constructed Pipeline",
+ getClass().getSimpleName()));
+ }
}
- /**
- * A translator that uses the explicit URN and payload from a {@link RawPTransform}.
- */
+ /** A translator that uses the explicit URN and payload from a {@link RawPTransform}. */
public static class RawPTransformTranslator
implements TransformPayloadTranslator<RawPTransform<?, ?>> {
@Override
@@ -262,27 +274,9 @@ public class PTransformTranslation {
@Override
public FunctionSpec translate(
- AppliedPTransform<?, ?, RawPTransform<?, ?>> transform,
- SdkComponents components) {
-
- // Anonymous composites have no spec
- if (transform.getTransform().getUrn() == null) {
- return null;
- }
-
- FunctionSpec.Builder transformSpec =
- FunctionSpec.newBuilder().setUrn(getUrn(transform.getTransform()));
-
- ByteString payload = transform.getTransform().getPayload();
- if (payload != null) {
- transformSpec.setPayload(payload);
- }
-
- // Transforms like Combine may have Coders that need to be added but do not
- // occur in a black-box traversal
- transform.getTransform().registerComponents(components);
-
- return transformSpec.build();
+ AppliedPTransform<?, ?, RawPTransform<?, ?>> transform, SdkComponents components)
+ throws IOException {
+ return transform.getTransform().migrate(components);
}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c14455ef/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
index 0aca837..1624865 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
@@ -24,7 +24,6 @@ import com.google.auto.value.AutoValue;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ListMultimap;
-import com.google.protobuf.ByteString;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
@@ -105,8 +104,7 @@ public class PipelineTranslation {
return DisplayData.from(component);
}
- public static Pipeline fromProto(final RunnerApi.Pipeline pipelineProto)
- throws IOException {
+ public static Pipeline fromProto(final RunnerApi.Pipeline pipelineProto) throws IOException {
TransformHierarchy transforms = new TransformHierarchy();
Pipeline pipeline = Pipeline.forTransformHierarchy(transforms, PipelineOptionsFactory.create());
@@ -184,11 +182,7 @@ public class PipelineTranslation {
}
RehydratedPTransform transform =
- RehydratedPTransform.of(
- transformSpec.getUrn(),
- transformSpec.getPayload(),
- additionalInputs,
- additionalCoders);
+ RehydratedPTransform.of(transformSpec, additionalInputs, additionalCoders);
if (isPrimitive(transformProto)) {
transforms.addFinalizedPrimitiveNode(
@@ -234,32 +228,33 @@ public class PipelineTranslation {
private static boolean isPrimitive(RunnerApi.PTransform transformProto) {
return transformProto.getSubtransformsCount() == 0
&& !transformProto
- .getInputsMap()
- .values()
- .containsAll(transformProto.getOutputsMap().values());
+ .getInputsMap()
+ .values()
+ .containsAll(transformProto.getOutputsMap().values());
}
@AutoValue
abstract static class RehydratedPTransform extends RawPTransform<PInput, POutput> {
@Nullable
- public abstract String getUrn();
-
- @Nullable
- public abstract ByteString getPayload();
+ public abstract RunnerApi.FunctionSpec getSpec();
@Override
public abstract Map<TupleTag<?>, PValue> getAdditionalInputs();
public abstract List<Coder<?>> getCoders();
+ @Override
+ public String getUrn() {
+ return getSpec().getUrn();
+ }
+
public static RehydratedPTransform of(
- String urn,
- ByteString payload,
+ RunnerApi.FunctionSpec payload,
Map<TupleTag<?>, PValue> additionalInputs,
List<Coder<?>> additionalCoders) {
return new AutoValue_PipelineTranslation_RehydratedPTransform(
- urn, payload, additionalInputs, additionalCoders);
+ payload, additionalInputs, additionalCoders);
}
@Override
@@ -275,12 +270,12 @@ public class PipelineTranslation {
public String toString() {
return MoreObjects.toStringHelper(this)
.add("urn", getUrn())
- .add("payload", getPayload())
+ .add("payload", getSpec())
.toString();
}
@Override
- public void registerComponents(SdkComponents components) {
+ public RunnerApi.FunctionSpec migrate(SdkComponents components) {
for (Coder<?> coder : getCoders()) {
try {
components.registerCoder(coder);
@@ -288,6 +283,7 @@ public class PipelineTranslation {
throw new RuntimeException(e);
}
}
+ return getSpec();
}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c14455ef/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
index 32d3409..ab66e84 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
@@ -24,6 +24,8 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import javax.annotation.Nullable;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.PTransformTranslation.RawPTransform;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.Coder;
@@ -295,6 +297,12 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
public String getUrn() {
return SPLITTABLE_PROCESS_KEYED_ELEMENTS_URN;
}
+
+ @Nullable
+ @Override
+ public RunnerApi.FunctionSpec getSpec() {
+ return null;
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/c14455ef/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
index 251260e..400df19 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
@@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
import java.util.List;
import java.util.Map;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.PTransformReplacements;
import org.apache.beam.runners.core.construction.PTransformTranslation.RawPTransform;
import org.apache.beam.runners.core.construction.ReplacementOutputs;
@@ -87,6 +88,12 @@ public class SplittableParDoViaKeyedWorkItems {
public String getUrn() {
return SplittableParDo.SPLITTABLE_GBKIKWI_URN;
}
+
+ @Override
+ public RunnerApi.FunctionSpec getSpec() {
+ throw new UnsupportedOperationException(
+ String.format("%s should never be serialized to proto", getClass().getSimpleName()));
+ }
}
/** Overrides a {@link ProcessKeyedElements} into {@link SplittableProcessViaKeyedWorkItems}. */
http://git-wip-us.apache.org/repos/asf/beam/blob/c14455ef/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
index 3ba04e7..9e56b65 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
@@ -20,6 +20,8 @@ package org.apache.beam.runners.direct;
import static com.google.common.base.Preconditions.checkArgument;
+import javax.annotation.Nullable;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.KeyedWorkItem;
import org.apache.beam.runners.core.KeyedWorkItemCoder;
import org.apache.beam.runners.core.construction.ForwardingPTransform;
@@ -92,6 +94,12 @@ class DirectGroupByKey<K, V>
public String getUrn() {
return DIRECT_GBKO_URN;
}
+
+ @Nullable
+ @Override
+ public RunnerApi.FunctionSpec getSpec() {
+ return null;
+ }
}
static final class DirectGroupAlsoByWindow<K, V>
@@ -141,5 +149,11 @@ class DirectGroupByKey<K, V>
public String getUrn() {
return DIRECT_GABW_URN;
}
+
+ @Nullable
+ @Override
+ public RunnerApi.FunctionSpec getSpec() {
+ return null;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c14455ef/runners/direct-java/src/main/java/org/apache/beam/runners/direct/MultiStepCombine.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/MultiStepCombine.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/MultiStepCombine.java
index ae21b4d..5253ef5 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/MultiStepCombine.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/MultiStepCombine.java
@@ -28,7 +28,9 @@ import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
+import javax.annotation.Nonnull;
import javax.annotation.Nullable;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.CombineTranslation;
import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.core.construction.PTransformTranslation.RawPTransform;
@@ -177,12 +179,18 @@ class MultiStepCombine<K, InputT, AccumT, OutputT>
this.outputCoder = outputCoder;
}
- @Nullable
+ @Nonnull
@Override
public String getUrn() {
return "urn:beam:directrunner:transforms:multistepcombine:v1";
}
+ @Nullable
+ @Override
+ public RunnerApi.FunctionSpec getSpec() {
+ return null;
+ }
+
@Override
public PCollection<KV<K, OutputT>> expand(PCollection<KV<K, InputT>> input) {
checkArgument(
@@ -337,11 +345,17 @@ class MultiStepCombine<K, InputT, AccumT, OutputT>
input.getPipeline(), input.getWindowingStrategy(), input.isBounded(), outputCoder);
}
- @Nullable
+ @Nonnull
@Override
public String getUrn() {
return DIRECT_MERGE_ACCUMULATORS_EXTRACT_OUTPUT_URN;
}
+
+ @Nullable
+ @Override
+ public RunnerApi.FunctionSpec getSpec() {
+ return null;
+ }
}
static class MergeAndExtractAccumulatorOutputEvaluatorFactory
http://git-wip-us.apache.org/repos/asf/beam/blob/c14455ef/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
index 26f30b0..5ec52be 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.KeyedWorkItem;
import org.apache.beam.runners.core.KeyedWorkItemCoder;
import org.apache.beam.runners.core.KeyedWorkItems;
@@ -261,6 +262,12 @@ class ParDoMultiOverrideFactory<InputT, OutputT>
public String getUrn() {
return DIRECT_STATEFUL_PAR_DO_URN;
}
+
+ @Override
+ public RunnerApi.FunctionSpec getSpec() {
+ throw new UnsupportedOperationException(
+ String.format("%s should never be serialized to proto", getClass().getSimpleName()));
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/c14455ef/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
index 49e7be7..d62b64c 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
@@ -29,6 +29,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.core.construction.ReplacementOutputs;
import org.apache.beam.runners.core.construction.TestStreamTranslation;
@@ -218,6 +219,12 @@ class TestStreamEvaluatorFactory implements TransformEvaluatorFactory {
public String getUrn() {
return DIRECT_TEST_STREAM_URN;
}
+
+ @Nullable
+ @Override
+ public RunnerApi.FunctionSpec getSpec() {
+ return null;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c14455ef/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
index c2255fe..61b7978 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
@@ -20,6 +20,8 @@ package org.apache.beam.runners.direct;
import java.io.IOException;
import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.CreatePCollectionViewTranslation;
import org.apache.beam.runners.core.construction.PTransformReplacements;
import org.apache.beam.runners.core.construction.PTransformTranslation.RawPTransform;
@@ -128,6 +130,12 @@ class ViewOverrideFactory<ElemT, ViewT>
public String getUrn() {
return DIRECT_WRITE_VIEW_URN;
}
+
+ @Nullable
+ @Override
+ public RunnerApi.FunctionSpec getSpec() {
+ return null;
+ }
}
public static final String DIRECT_WRITE_VIEW_URN =