You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/10/17 21:08:13 UTC
[05/14] beam git commit: Add TransformPayloadTranslator.rehydrate to
optionally specialize RawPTransform
Add TransformPayloadTranslator.rehydrate to optionally specialize RawPTransform
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/10c63e15
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/10c63e15
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/10c63e15
Branch: refs/heads/master
Commit: 10c63e15ab51b885372f7b6251d8ace63bae0ad1
Parents: c14455e
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Oct 2 19:25:28 2017 -0700
Committer: Kenneth Knowles <ke...@apache.org>
Committed: Tue Oct 17 12:45:11 2017 -0700
----------------------------------------------------------------------
.../core/construction/CombineTranslation.java | 33 ++--
.../CreatePCollectionViewTranslation.java | 7 +-
.../core/construction/FlattenTranslator.java | 8 +-
.../construction/GroupByKeyTranslation.java | 13 +-
.../construction/PTransformTranslation.java | 155 ++++++++++++++++++-
.../core/construction/ParDoTranslation.java | 70 ++++-----
.../core/construction/PipelineTranslation.java | 76 +--------
.../core/construction/ReadTranslation.java | 43 +++--
.../construction/TestStreamTranslation.java | 8 +-
.../TransformPayloadTranslatorRegistrar.java | 2 +
.../construction/WindowIntoTranslation.java | 15 +-
.../construction/WriteFilesTranslation.java | 16 +-
.../direct/TransformEvaluatorRegistry.java | 18 +--
runners/flink/pom.xml | 5 -
.../FlinkStreamingTransformTranslators.java | 60 ++-----
15 files changed, 280 insertions(+), 249 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/10c63e15/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java
index c3d9553..69591ee 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java
@@ -54,11 +54,10 @@ import org.apache.beam.sdk.values.PCollection;
public class CombineTranslation {
public static final String JAVA_SERIALIZED_COMBINE_FN_URN = "urn:beam:combinefn:javasdk:v1";
- /**
- * A {@link TransformPayloadTranslator} for {@link Combine.PerKey}.
- */
+ /** A {@link TransformPayloadTranslator} for {@link Combine.PerKey}. */
public static class CombinePayloadTranslator
- implements PTransformTranslation.TransformPayloadTranslator<Combine.PerKey<?, ?, ?>> {
+ extends PTransformTranslation.TransformPayloadTranslator.WithDefaultRehydration<
+ Combine.PerKey<?, ?, ?>> {
public static TransformPayloadTranslator create() {
return new CombinePayloadTranslator();
}
@@ -81,9 +80,7 @@ public class CombineTranslation {
.build();
}
- /**
- * Registers {@link CombinePayloadTranslator}.
- */
+ /** Registers {@link CombinePayloadTranslator}. */
@AutoService(TransformPayloadTranslatorRegistrar.class)
public static class Registrar implements TransformPayloadTranslatorRegistrar {
@Override
@@ -91,6 +88,11 @@ public class CombineTranslation {
getTransformPayloadTranslators() {
return Collections.singletonMap(Combine.PerKey.class, new CombinePayloadTranslator());
}
+
+ @Override
+ public Map<String, TransformPayloadTranslator> getTransformRehydrators() {
+ return Collections.emptyMap();
+ }
}
}
@@ -136,8 +138,7 @@ public class CombineTranslation {
.setSpec(
FunctionSpec.newBuilder()
.setUrn(JAVA_SERIALIZED_COMBINE_FN_URN)
- .setPayload(
- ByteString.copyFrom(SerializableUtils.serializeToByteArray(combineFn)))
+ .setPayload(ByteString.copyFrom(SerializableUtils.serializeToByteArray(combineFn)))
.build())
.build();
}
@@ -148,8 +149,8 @@ public class CombineTranslation {
return components.getCoder(id);
}
- public static Coder<?> getAccumulatorCoder(
- AppliedPTransform<?, ?, ?> transform) throws IOException {
+ public static Coder<?> getAccumulatorCoder(AppliedPTransform<?, ?, ?> transform)
+ throws IOException {
SdkComponents sdkComponents = SdkComponents.create();
String id = getCombinePayload(transform, sdkComponents).getAccumulatorCoderId();
Components components = sdkComponents.toComponents();
@@ -157,17 +158,11 @@ public class CombineTranslation {
components.getCodersOrThrow(id), RehydratedComponents.forComponents(components));
}
- public static GlobalCombineFn<?, ?, ?> getCombineFn(CombinePayload payload)
- throws IOException {
+ public static GlobalCombineFn<?, ?, ?> getCombineFn(CombinePayload payload) throws IOException {
checkArgument(payload.getCombineFn().getSpec().getUrn().equals(JAVA_SERIALIZED_COMBINE_FN_URN));
return (GlobalCombineFn<?, ?, ?>)
SerializableUtils.deserializeFromByteArray(
- payload
- .getCombineFn()
- .getSpec()
- .getPayload()
- .toByteArray(),
- "CombineFn");
+ payload.getCombineFn().getSpec().getPayload().toByteArray(), "CombineFn");
}
public static GlobalCombineFn<?, ?, ?> getCombineFn(AppliedPTransform<?, ?, ?> transform)
http://git-wip-us.apache.org/repos/asf/beam/blob/10c63e15/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java
index 4b8edcf..709cb8a 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java
@@ -88,7 +88,7 @@ public class CreatePCollectionViewTranslation {
*/
@Deprecated
static class CreatePCollectionViewTranslator
- implements TransformPayloadTranslator<View.CreatePCollectionView<?, ?>> {
+ extends TransformPayloadTranslator.WithDefaultRehydration<View.CreatePCollectionView<?, ?>> {
@Override
public String getUrn(View.CreatePCollectionView<?, ?> transform) {
return PTransformTranslation.CREATE_VIEW_TRANSFORM_URN;
@@ -122,5 +122,10 @@ public class CreatePCollectionViewTranslation {
return Collections.singletonMap(
View.CreatePCollectionView.class, new CreatePCollectionViewTranslator());
}
+
+ @Override
+ public Map<String, TransformPayloadTranslator> getTransformRehydrators() {
+ return Collections.emptyMap();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/10c63e15/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/FlattenTranslator.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/FlattenTranslator.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/FlattenTranslator.java
index c9798e6..972c453 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/FlattenTranslator.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/FlattenTranslator.java
@@ -32,7 +32,8 @@ import org.apache.beam.sdk.transforms.windowing.Window.Assign;
/**
* Utility methods for translating a {@link Assign} to and from {@link RunnerApi} representations.
*/
-public class FlattenTranslator implements TransformPayloadTranslator<Flatten.PCollections<?>> {
+public class FlattenTranslator
+ extends TransformPayloadTranslator.WithDefaultRehydration<Flatten.PCollections<?>> {
public static TransformPayloadTranslator create() {
return new FlattenTranslator();
@@ -59,5 +60,10 @@ public class FlattenTranslator implements TransformPayloadTranslator<Flatten.PCo
getTransformPayloadTranslators() {
return Collections.singletonMap(Flatten.PCollections.class, new FlattenTranslator());
}
+
+ @Override
+ public Map<String, TransformPayloadTranslator> getTransformRehydrators() {
+ return Collections.emptyMap();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/10c63e15/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/GroupByKeyTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/GroupByKeyTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/GroupByKeyTranslation.java
index 840bae2..0803ad3 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/GroupByKeyTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/GroupByKeyTranslation.java
@@ -34,7 +34,8 @@ import org.apache.beam.sdk.transforms.PTransform;
*/
public class GroupByKeyTranslation {
- static class GroupByKeyTranslator implements TransformPayloadTranslator<GroupByKey<?, ?>> {
+ static class GroupByKeyTranslator
+ extends TransformPayloadTranslator.WithDefaultRehydration<GroupByKey<?, ?>> {
@Override
public String getUrn(GroupByKey<?, ?> transform) {
return PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN;
@@ -43,13 +44,10 @@ public class GroupByKeyTranslation {
@Override
public FunctionSpec translate(
AppliedPTransform<?, ?, GroupByKey<?, ?>> transform, SdkComponents components) {
- return FunctionSpec.newBuilder()
- .setUrn(getUrn(transform.getTransform()))
- .build();
+ return FunctionSpec.newBuilder().setUrn(getUrn(transform.getTransform())).build();
}
}
-
/** Registers {@link GroupByKeyTranslator}. */
@AutoService(TransformPayloadTranslatorRegistrar.class)
public static class Registrar implements TransformPayloadTranslatorRegistrar {
@@ -58,5 +56,10 @@ public class GroupByKeyTranslation {
getTransformPayloadTranslators() {
return Collections.singletonMap(GroupByKey.class, new GroupByKeyTranslator());
}
+
+ @Override
+ public Map<String, TransformPayloadTranslator> getTransformRehydrators() {
+ return Collections.emptyMap();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/10c63e15/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
index 31767a0..785b9e4 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
@@ -20,7 +20,9 @@ package org.apache.beam.runners.core.construction;
import static com.google.common.base.Preconditions.checkArgument;
+import com.google.auto.value.AutoValue;
import com.google.common.base.Joiner;
+import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import java.io.IOException;
@@ -74,6 +76,12 @@ public class PTransformTranslation {
private static final Map<Class<? extends PTransform>, TransformPayloadTranslator>
KNOWN_PAYLOAD_TRANSLATORS = loadTransformPayloadTranslators();
+ private static final Map<String, TransformPayloadTranslator> KNOWN_REHYDRATORS =
+ loadTransformRehydrators();
+
+ private static final TransformPayloadTranslator<?> DEFAULT_REHYDRATOR =
+ new RawPTransformTranslator();
+
private static Map<Class<? extends PTransform>, TransformPayloadTranslator>
loadTransformPayloadTranslators() {
HashMap<Class<? extends PTransform>, TransformPayloadTranslator> translators = new HashMap<>();
@@ -98,6 +106,29 @@ public class PTransformTranslation {
return ImmutableMap.copyOf(translators);
}
+ private static Map<String, TransformPayloadTranslator> loadTransformRehydrators() {
+ HashMap<String, TransformPayloadTranslator> rehydrators = new HashMap<>();
+
+ for (TransformPayloadTranslatorRegistrar registrar :
+ ServiceLoader.load(TransformPayloadTranslatorRegistrar.class)) {
+
+ Map<String, ? extends TransformPayloadTranslator> newRehydrators =
+ registrar.getTransformRehydrators();
+
+ Set<String> alreadyRegistered =
+ Sets.intersection(rehydrators.keySet(), newRehydrators.keySet());
+
+ if (!alreadyRegistered.isEmpty()) {
+ throw new IllegalArgumentException(
+ String.format(
+ "URNs already registered: %s", Joiner.on(", ").join(alreadyRegistered)));
+ }
+
+ rehydrators.putAll(newRehydrators);
+ }
+ return ImmutableMap.copyOf(rehydrators);
+ }
+
private PTransformTranslation() {}
/**
@@ -150,17 +181,36 @@ public class PTransformTranslation {
// context of our current serialization
transformBuilder.setSpec(((RawPTransform<?, ?>) transform).migrate(components));
} else if (KNOWN_PAYLOAD_TRANSLATORS.containsKey(transform.getClass())) {
- FunctionSpec payload =
+ transformBuilder.setSpec(
KNOWN_PAYLOAD_TRANSLATORS
.get(transform.getClass())
- .translate(appliedPTransform, components);
- transformBuilder.setSpec(payload);
+ .translate(appliedPTransform, components));
}
return transformBuilder.build();
}
/**
+ * Translates a {@link RunnerApi.PTransform} to a {@link RawPTransform} specialized for the URN
+ * and spec.
+ */
+ static RawPTransform<?, ?> rehydrate(
+ RunnerApi.PTransform protoTransform, RehydratedComponents rehydratedComponents)
+ throws IOException {
+
+ @Nullable
+ TransformPayloadTranslator<?> rehydrator =
+ KNOWN_REHYDRATORS.get(
+ protoTransform.getSpec() == null ? null : protoTransform.getSpec().getUrn());
+
+ if (rehydrator == null) {
+ return DEFAULT_REHYDRATOR.rehydrate(protoTransform, rehydratedComponents);
+ } else {
+ return rehydrator.rehydrate(protoTransform, rehydratedComponents);
+ }
+ }
+
+ /**
* Translates a composite {@link AppliedPTransform} into a runner API proto with no component
* transforms.
*
@@ -206,14 +256,66 @@ public class PTransformTranslation {
}
/**
- * A translator consumes a {@link PTransform} application and produces the appropriate
- * FunctionSpec for a distinguished or primitive transform within the Beam runner API.
+ * A bi-directional translator between a Java-based {@link PTransform} and a protobuf payload for
+ * that transform.
+ *
+ * <p>When going to a protocol buffer message, the translator produces a payload corresponding to
+ * the Java representation while registering components that payload references.
+ *
+ * <p>When "rehydrating" a protocol buffer message, the translator returns a {@link RawPTransform}
+ * - because the transform may not be Java-based, it is not possible to rebuild a Java-based
+ * {@link PTransform}. The resulting {@link RawPTransform} subclass encapsulates the knowledge of
+ * which components are referenced in the payload.
*/
public interface TransformPayloadTranslator<T extends PTransform<?, ?>> {
String getUrn(T transform);
FunctionSpec translate(AppliedPTransform<?, ?, T> application, SdkComponents components)
throws IOException;
+
+ RawPTransform<?, ?> rehydrate(
+ RunnerApi.PTransform protoTransform, RehydratedComponents rehydratedComponents)
+ throws IOException;
+
+ /**
+ * A {@link TransformPayloadTranslator} for transforms that contain no references to components,
+ * so they do not need a specialized rehydration.
+ */
+ abstract class WithDefaultRehydration<T extends PTransform<?, ?>>
+ implements TransformPayloadTranslator<T> {
+ @Override
+ public final RawPTransform<?, ?> rehydrate(
+ RunnerApi.PTransform protoTransform, RehydratedComponents rehydratedComponents)
+ throws IOException {
+ return UnknownRawPTransform.forSpec(protoTransform.getSpec());
+ }
+ }
+
+ /**
+ * A {@link TransformPayloadTranslator} for transforms that contain no references to components,
+ * so they do not need a specialized rehydration.
+ */
+ abstract class NotSerializable<T extends PTransform<?, ?>>
+ implements TransformPayloadTranslator<T> {
+ @Override
+ public final FunctionSpec translate(
+ AppliedPTransform<?, ?, T> transform, SdkComponents components) throws IOException {
+ throw new UnsupportedOperationException(
+ String.format(
+ "%s should never be translated",
+ transform.getTransform().getClass().getCanonicalName()));
+ }
+
+ @Override
+ public final RawPTransform<?, ?> rehydrate(
+ RunnerApi.PTransform protoTransform, RehydratedComponents rehydratedComponents)
+ throws IOException {
+ throw new UnsupportedOperationException(
+ String.format(
+ "%s.rehydrate should never be called; there is no serialized form",
+ getClass().getCanonicalName()));
+ }
+ }
}
/**
@@ -264,6 +366,43 @@ public class PTransformTranslation {
}
}
+ @AutoValue
+ abstract static class UnknownRawPTransform extends RawPTransform<PInput, POutput> {
+
+ @Override
+ public String getUrn() {
+ return getSpec() == null ? null : getSpec().getUrn();
+ }
+
+ @Nullable
+ public abstract RunnerApi.FunctionSpec getSpec();
+
+ public static UnknownRawPTransform forSpec(RunnerApi.FunctionSpec spec) {
+ return new AutoValue_PTransformTranslation_UnknownRawPTransform(spec);
+ }
+
+ @Override
+ public POutput expand(PInput input) {
+ throw new IllegalStateException(
+ String.format(
+ "%s should never be asked to expand;"
+ + " it is the result of deserializing an already-constructed Pipeline",
+ getClass().getSimpleName()));
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("urn", getUrn())
+ .add("payload", getSpec())
+ .toString();
+ }
+
+ public RunnerApi.FunctionSpec getSpecForComponents(SdkComponents components) {
+ return getSpec();
+ }
+ }
+
/** A translator that uses the explicit URN and payload from a {@link RawPTransform}. */
public static class RawPTransformTranslator
implements TransformPayloadTranslator<RawPTransform<?, ?>> {
@@ -278,5 +417,11 @@ public class PTransformTranslation {
throws IOException {
return transform.getTransform().migrate(components);
}
+
+ @Override
+ public RawPTransform<?, ?> rehydrate(
+ RunnerApi.PTransform protoTransform, RehydratedComponents rehydratedComponents) {
+ return UnknownRawPTransform.forSpec(protoTransform.getSpec());
+ }
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/10c63e15/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
index 3886e47..5092448 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
@@ -79,29 +79,20 @@ import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.WindowingStrategy;
-/**
- * Utilities for interacting with {@link ParDo} instances and {@link ParDoPayload} protos.
- */
+/** Utilities for interacting with {@link ParDo} instances and {@link ParDoPayload} protos. */
public class ParDoTranslation {
- /**
- * The URN for an unknown Java {@link DoFn}.
- */
+ /** The URN for an unknown Java {@link DoFn}. */
public static final String CUSTOM_JAVA_DO_FN_URN = "urn:beam:dofn:javasdk:0.1";
- /**
- * The URN for an unknown Java {@link ViewFn}.
- */
+ /** The URN for an unknown Java {@link ViewFn}. */
public static final String CUSTOM_JAVA_VIEW_FN_URN = "urn:beam:viewfn:javasdk:0.1";
- /**
- * The URN for an unknown Java {@link WindowMappingFn}.
- */
+ /** The URN for an unknown Java {@link WindowMappingFn}. */
public static final String CUSTOM_JAVA_WINDOW_MAPPING_FN_URN =
"urn:beam:windowmappingfn:javasdk:0.1";
- /**
- * A {@link TransformPayloadTranslator} for {@link ParDo}.
- */
+ /** A {@link TransformPayloadTranslator} for {@link ParDo}. */
public static class ParDoPayloadTranslator
- implements PTransformTranslation.TransformPayloadTranslator<ParDo.MultiOutput<?, ?>> {
+ extends PTransformTranslation.TransformPayloadTranslator.WithDefaultRehydration<
+ ParDo.MultiOutput<?, ?>> {
public static TransformPayloadTranslator create() {
return new ParDoPayloadTranslator();
}
@@ -124,9 +115,7 @@ public class ParDoTranslation {
.build();
}
- /**
- * Registers {@link ParDoPayloadTranslator}.
- */
+ /** Registers {@link ParDoPayloadTranslator}. */
@AutoService(TransformPayloadTranslatorRegistrar.class)
public static class Registrar implements TransformPayloadTranslatorRegistrar {
@Override
@@ -134,11 +123,16 @@ public class ParDoTranslation {
getTransformPayloadTranslators() {
return Collections.singletonMap(ParDo.MultiOutput.class, new ParDoPayloadTranslator());
}
+
+ @Override
+ public Map<String, TransformPayloadTranslator> getTransformRehydrators() {
+ return Collections.emptyMap();
+ }
}
}
public static ParDoPayload toProto(ParDo.MultiOutput<?, ?> parDo, SdkComponents components)
- throws IOException {
+ throws IOException {
DoFn<?, ?> doFn = parDo.getFn();
DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
Map<String, StateDeclaration> states = signature.stateDeclarations();
@@ -158,13 +152,11 @@ public class ParDoTranslation {
}
}
for (Map.Entry<String, StateDeclaration> state : states.entrySet()) {
- RunnerApi.StateSpec spec =
- toProto(getStateSpecOrCrash(state.getValue(), doFn), components);
+ RunnerApi.StateSpec spec = toProto(getStateSpecOrCrash(state.getValue(), doFn), components);
builder.putStateSpecs(state.getKey(), spec);
}
for (Map.Entry<String, TimerDeclaration> timer : timers.entrySet()) {
- RunnerApi.TimerSpec spec =
- toProto(getTimerSpecOrCrash(timer.getValue(), doFn));
+ RunnerApi.TimerSpec spec = toProto(getTimerSpecOrCrash(timer.getValue(), doFn));
builder.putTimerSpecs(timer.getKey(), spec);
}
return builder.build();
@@ -174,7 +166,8 @@ public class ParDoTranslation {
StateDeclaration stateDeclaration, DoFn<?, ?> target) {
try {
Object fieldValue = stateDeclaration.field().get(target);
- checkState(fieldValue instanceof StateSpec,
+ checkState(
+ fieldValue instanceof StateSpec,
"Malformed %s class %s: state declaration field %s does not have type %s.",
DoFn.class.getSimpleName(),
target.getClass().getName(),
@@ -196,7 +189,8 @@ public class ParDoTranslation {
TimerDeclaration timerDeclaration, DoFn<?, ?> target) {
try {
Object fieldValue = timerDeclaration.field().get(target);
- checkState(fieldValue instanceof TimerSpec,
+ checkState(
+ fieldValue instanceof TimerSpec,
"Malformed %s class %s: timer declaration field %s does not have type %s.",
DoFn.class.getSimpleName(),
target.getClass().getName(),
@@ -273,8 +267,7 @@ public class ParDoTranslation {
}
SdkComponents sdkComponents = SdkComponents.create();
- RunnerApi.PTransform parDoProto =
- PTransformTranslation.toProto(application, sdkComponents);
+ RunnerApi.PTransform parDoProto = PTransformTranslation.toProto(application, sdkComponents);
ParDoPayload payload = ParDoPayload.parseFrom(parDoProto.getSpec().getPayload());
List<PCollectionView<?>> views = new ArrayList<>();
@@ -289,12 +282,7 @@ public class ParDoTranslation {
"no input with tag %s",
sideInputTag);
views.add(
- viewFromProto(
- sideInput,
- sideInputTag,
- originalPCollection,
- parDoProto,
- components));
+ viewFromProto(sideInput, sideInputTag, originalPCollection, parDoProto, components));
}
return views;
}
@@ -414,7 +402,6 @@ public class ParDoTranslation {
default:
throw new IllegalArgumentException(
String.format("Unknown %s: %s", RunnerApi.StateSpec.class.getName(), stateSpec));
-
}
}
@@ -431,7 +418,7 @@ public class ParDoTranslation {
}
private static RunnerApi.TimeDomain.Enum toProto(TimeDomain timeDomain) {
- switch(timeDomain) {
+ switch (timeDomain) {
case EVENT_TIME:
return RunnerApi.TimeDomain.Enum.EVENT_TIME;
case PROCESSING_TIME:
@@ -445,12 +432,12 @@ public class ParDoTranslation {
@AutoValue
abstract static class DoFnAndMainOutput implements Serializable {
- public static DoFnAndMainOutput of(
- DoFn<?, ?> fn, TupleTag<?> tag) {
+ public static DoFnAndMainOutput of(DoFn<?, ?> fn, TupleTag<?> tag) {
return new AutoValue_ParDoTranslation_DoFnAndMainOutput(fn, tag);
}
abstract DoFn<?, ?> getDoFn();
+
abstract TupleTag<?> getMainOutputTag();
}
@@ -475,8 +462,7 @@ public class ParDoTranslation {
FunctionSpec.class.getSimpleName(),
CUSTOM_JAVA_DO_FN_URN,
fnSpec.getSpec().getUrn());
- byte[] serializedFn =
- fnSpec.getSpec().getPayload().toByteArray();
+ byte[] serializedFn = fnSpec.getSpec().getPayload().toByteArray();
return (DoFnAndMainOutput)
SerializableUtils.deserializeFromByteArray(serializedFn, "Custom DoFn And Main Output tag");
}
@@ -505,9 +491,7 @@ public class ParDoTranslation {
public static SideInput toProto(PCollectionView<?> view) {
Builder builder = SideInput.newBuilder();
builder.setAccessPattern(
- FunctionSpec.newBuilder()
- .setUrn(view.getViewFn().getMaterialization().getUrn())
- .build());
+ FunctionSpec.newBuilder().setUrn(view.getViewFn().getMaterialization().getUrn()).build());
builder.setViewFn(toProto(view.getViewFn()));
builder.setWindowMappingFn(toProto(view.getWindowMappingFn()));
return builder.build();
http://git-wip-us.apache.org/repos/asf/beam/blob/10c63e15/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
index 1624865..85033e5 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
@@ -20,8 +20,6 @@ package org.apache.beam.runners.core.construction;
import static com.google.common.base.Preconditions.checkNotNull;
-import com.google.auto.value.AutoValue;
-import com.google.common.base.MoreObjects;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ListMultimap;
import java.io.IOException;
@@ -32,12 +30,10 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import javax.annotation.Nullable;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.PTransformTranslation.RawPTransform;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.Pipeline.PipelineVisitor;
-import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.TransformHierarchy;
@@ -47,8 +43,6 @@ import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PCollectionViews;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
@@ -148,6 +142,8 @@ public class PipelineTranslation {
}
RunnerApi.FunctionSpec transformSpec = transformProto.getSpec();
+ RawPTransform<?, ?> transform =
+ PTransformTranslation.rehydrate(transformProto, rehydratedComponents);
// By default, no "additional" inputs, since that is an SDK-specific thing.
// Only ParDo and WriteFiles really separate main from side inputs
@@ -170,20 +166,6 @@ public class PipelineTranslation {
transformProto, rehydratedComponents, rehydratedInputs, payload.getSideInputsMap());
}
- // TODO: CombineTranslator should own it - https://issues.apache.org/jira/browse/BEAM-2674
- List<Coder<?>> additionalCoders = Collections.emptyList();
- if (transformSpec.getUrn().equals(PTransformTranslation.COMBINE_TRANSFORM_URN)) {
- RunnerApi.CombinePayload payload =
- RunnerApi.CombinePayload.parseFrom(transformSpec.getPayload());
- additionalCoders =
- (List)
- Collections.singletonList(
- rehydratedComponents.getCoder(payload.getAccumulatorCoderId()));
- }
-
- RehydratedPTransform transform =
- RehydratedPTransform.of(transformSpec, additionalInputs, additionalCoders);
-
if (isPrimitive(transformProto)) {
transforms.addFinalizedPrimitiveNode(
transformProto.getUniqueName(), rehydratedInputs, transform, rehydratedOutputs);
@@ -232,58 +214,4 @@ public class PipelineTranslation {
.values()
.containsAll(transformProto.getOutputsMap().values());
}
-
- @AutoValue
- abstract static class RehydratedPTransform extends RawPTransform<PInput, POutput> {
-
- @Nullable
- public abstract RunnerApi.FunctionSpec getSpec();
-
- @Override
- public abstract Map<TupleTag<?>, PValue> getAdditionalInputs();
-
- public abstract List<Coder<?>> getCoders();
-
- @Override
- public String getUrn() {
- return getSpec().getUrn();
- }
-
- public static RehydratedPTransform of(
- RunnerApi.FunctionSpec payload,
- Map<TupleTag<?>, PValue> additionalInputs,
- List<Coder<?>> additionalCoders) {
- return new AutoValue_PipelineTranslation_RehydratedPTransform(
- payload, additionalInputs, additionalCoders);
- }
-
- @Override
- public POutput expand(PInput input) {
- throw new IllegalStateException(
- String.format(
- "%s should never be asked to expand;"
- + " it is the result of deserializing an already-constructed Pipeline",
- getClass().getSimpleName()));
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(this)
- .add("urn", getUrn())
- .add("payload", getSpec())
- .toString();
- }
-
- @Override
- public RunnerApi.FunctionSpec migrate(SdkComponents components) {
- for (Coder<?> coder : getCoders()) {
- try {
- components.registerCoder(coder);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- return getSpec();
- }
- }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/10c63e15/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java
index e9168a2..4b14c51 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java
@@ -44,8 +44,8 @@ import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
/**
- * Methods for translating {@link Read.Bounded} and {@link Read.Unbounded}
- * {@link PTransform PTransformTranslation} into {@link ReadPayload} protos.
+ * Methods for translating {@link Read.Bounded} and {@link Read.Unbounded} {@link PTransform
+ * PTransformTranslation} into {@link ReadPayload} protos.
*/
public class ReadTranslation {
private static final String JAVA_SERIALIZED_BOUNDED_SOURCE = "urn:beam:java:boundedsource:v1";
@@ -89,13 +89,9 @@ public class ReadTranslation {
public static BoundedSource<?> boundedSourceFromProto(ReadPayload payload)
throws InvalidProtocolBufferException {
checkArgument(payload.getIsBounded().equals(IsBounded.Enum.BOUNDED));
- return (BoundedSource<?>) SerializableUtils.deserializeFromByteArray(
- payload
- .getSource()
- .getSpec()
- .getPayload()
- .toByteArray(),
- "BoundedSource");
+ return (BoundedSource<?>)
+ SerializableUtils.deserializeFromByteArray(
+ payload.getSource().getSpec().getPayload().toByteArray(), "BoundedSource");
}
public static <T> BoundedSource<T> boundedSourceFromTransform(
@@ -136,13 +132,9 @@ public class ReadTranslation {
public static UnboundedSource<?, ?> unboundedSourceFromProto(ReadPayload payload)
throws InvalidProtocolBufferException {
checkArgument(payload.getIsBounded().equals(IsBounded.Enum.UNBOUNDED));
- return (UnboundedSource<?, ?>) SerializableUtils.deserializeFromByteArray(
- payload
- .getSource()
- .getSpec()
- .getPayload()
- .toByteArray(),
- "BoundedSource");
+ return (UnboundedSource<?, ?>)
+ SerializableUtils.deserializeFromByteArray(
+ payload.getSource().getSpec().getPayload().toByteArray(), "BoundedSource");
}
public static PCollection.IsBounded sourceIsBounded(AppliedPTransform<?, ?, ?> transform) {
@@ -161,11 +153,10 @@ public class ReadTranslation {
}
}
- /**
- * A {@link TransformPayloadTranslator} for {@link Read.Unbounded}.
- */
+ /** A {@link TransformPayloadTranslator} for {@link Read.Unbounded}. */
public static class UnboundedReadPayloadTranslator
- implements PTransformTranslation.TransformPayloadTranslator<Read.Unbounded<?>> {
+ extends PTransformTranslation.TransformPayloadTranslator.WithDefaultRehydration<
+ Read.Unbounded<?>> {
public static TransformPayloadTranslator create() {
return new UnboundedReadPayloadTranslator();
}
@@ -188,11 +179,10 @@ public class ReadTranslation {
}
}
- /**
- * A {@link TransformPayloadTranslator} for {@link Read.Bounded}.
- */
+ /** A {@link TransformPayloadTranslator} for {@link Read.Bounded}. */
public static class BoundedReadPayloadTranslator
- implements PTransformTranslation.TransformPayloadTranslator<Read.Bounded<?>> {
+ extends PTransformTranslation.TransformPayloadTranslator.WithDefaultRehydration<
+ Read.Bounded<?>> {
public static TransformPayloadTranslator create() {
return new BoundedReadPayloadTranslator();
}
@@ -226,5 +216,10 @@ public class ReadTranslation {
.put(Read.Bounded.class, new BoundedReadPayloadTranslator())
.build();
}
+
+ @Override
+ public Map<String, TransformPayloadTranslator> getTransformRehydrators() {
+ return Collections.emptyMap();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/10c63e15/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java
index de4d6bb..8e4c1db 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java
@@ -172,7 +172,8 @@ public class TestStreamTranslation {
}
}
- static class TestStreamTranslator implements TransformPayloadTranslator<TestStream<?>> {
+ static class TestStreamTranslator
+ extends TransformPayloadTranslator.WithDefaultRehydration<TestStream<?>> {
@Override
public String getUrn(TestStream<?> transform) {
return TEST_STREAM_TRANSFORM_URN;
@@ -197,5 +198,10 @@ public class TestStreamTranslation {
getTransformPayloadTranslators() {
return Collections.singletonMap(TestStream.class, new TestStreamTranslator());
}
+
+ @Override
+ public Map<String, TransformPayloadTranslator> getTransformRehydrators() {
+ return Collections.emptyMap();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/10c63e15/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformPayloadTranslatorRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformPayloadTranslatorRegistrar.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformPayloadTranslatorRegistrar.java
index 3b3ffa1..58417a8 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformPayloadTranslatorRegistrar.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformPayloadTranslatorRegistrar.java
@@ -26,4 +26,6 @@ import org.apache.beam.sdk.transforms.PTransform;
public interface TransformPayloadTranslatorRegistrar {
Map<? extends Class<? extends PTransform>, ? extends TransformPayloadTranslator>
getTransformPayloadTranslators();
+
+ Map<String, ? extends TransformPayloadTranslator> getTransformRehydrators();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/10c63e15/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java
index ad6177d..9158aba 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java
@@ -41,7 +41,8 @@ import org.apache.beam.sdk.transforms.windowing.WindowFn;
*/
public class WindowIntoTranslation {
- static class WindowAssignTranslator implements TransformPayloadTranslator<Window.Assign<?>> {
+ static class WindowAssignTranslator
+ extends TransformPayloadTranslator.WithDefaultRehydration<Window.Assign<?>> {
@Override
public String getUrn(Assign<?> transform) {
@@ -105,11 +106,10 @@ public class WindowIntoTranslation {
getWindowIntoPayload(application).getWindowFn());
}
- /**
- * A {@link TransformPayloadTranslator} for {@link Window}.
- */
+ /** A {@link TransformPayloadTranslator} for {@link Window}. */
public static class WindowIntoPayloadTranslator
- implements PTransformTranslation.TransformPayloadTranslator<Window.Assign<?>> {
+ extends PTransformTranslation.TransformPayloadTranslator.WithDefaultRehydration<
+ Window.Assign<?>> {
public static TransformPayloadTranslator create() {
return new WindowIntoPayloadTranslator();
}
@@ -140,5 +140,10 @@ public class WindowIntoTranslation {
getTransformPayloadTranslators() {
return Collections.singletonMap(Window.Assign.class, new WindowIntoPayloadTranslator());
}
+
+ @Override
+ public Map<String, TransformPayloadTranslator> getTransformRehydrators() {
+ return Collections.emptyMap();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/10c63e15/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
index 5a49747..645b562 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.core.construction;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.beam.runners.core.construction.PTransformTranslation.WRITE_FILES_TRANSFORM_URN;
import com.google.auto.service.AutoService;
import com.google.common.annotations.VisibleForTesting;
@@ -173,10 +174,11 @@ public class WriteFilesTranslation {
.getPayload());
}
- static class WriteFilesTranslator implements TransformPayloadTranslator<WriteFiles<?, ?, ?>> {
+ static class WriteFilesTranslator
+ extends TransformPayloadTranslator.WithDefaultRehydration<WriteFiles<?, ?, ?>> {
@Override
public String getUrn(WriteFiles<?, ?, ?> transform) {
- return PTransformTranslation.WRITE_FILES_TRANSFORM_URN;
+ return WRITE_FILES_TRANSFORM_URN;
}
@Override
@@ -193,9 +195,15 @@ public class WriteFilesTranslation {
@AutoService(TransformPayloadTranslatorRegistrar.class)
public static class Registrar implements TransformPayloadTranslatorRegistrar {
@Override
- public Map<? extends Class<? extends PTransform>, ? extends TransformPayloadTranslator>
+ public Map<Class<? extends PTransform>, TransformPayloadTranslator>
getTransformPayloadTranslators() {
- return Collections.singletonMap(WriteFiles.class, new WriteFilesTranslator());
+ return Collections.<Class<? extends PTransform>, TransformPayloadTranslator>singletonMap(
+ WriteFiles.class, new WriteFilesTranslator());
+ }
+
+ @Override
+ public Map<String, TransformPayloadTranslator> getTransformRehydrators() {
+ return Collections.emptyMap();
}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/10c63e15/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
index 9cfa79f..099252f 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
@@ -35,14 +35,13 @@ import com.google.auto.service.AutoService;
import com.google.common.collect.ImmutableMap;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems;
import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems.ProcessElements;
import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
-import org.apache.beam.runners.core.construction.SdkComponents;
import org.apache.beam.runners.core.construction.TransformPayloadTranslatorRegistrar;
import org.apache.beam.runners.direct.TestStreamEvaluatorFactory.DirectTestStreamFactory.DirectTestStream;
import org.apache.beam.sdk.runners.AppliedPTransform;
@@ -115,6 +114,11 @@ class TransformEvaluatorRegistry implements TransformEvaluatorFactory {
new SplittableParDoProcessElementsTranslator())
.build();
}
+
+ @Override
+ public Map<String, TransformPayloadTranslator> getTransformRehydrators() {
+ return Collections.emptyMap();
+ }
}
/**
@@ -122,7 +126,7 @@ class TransformEvaluatorRegistry implements TransformEvaluatorFactory {
* once SDF is reorganized appropriately.
*/
private static class SplittableParDoProcessElementsTranslator
- implements TransformPayloadTranslator<ProcessElements<?, ?, ?, ?>> {
+ extends TransformPayloadTranslator.NotSerializable<ProcessElements<?, ?, ?, ?>> {
private SplittableParDoProcessElementsTranslator() {}
@@ -130,14 +134,6 @@ class TransformEvaluatorRegistry implements TransformEvaluatorFactory {
public String getUrn(ProcessElements<?, ?, ?, ?> transform) {
return SPLITTABLE_PROCESS_URN;
}
-
- @Override
- public FunctionSpec translate(
- AppliedPTransform<?, ?, ProcessElements<?, ?, ?, ?>> transform, SdkComponents components) {
- throw new UnsupportedOperationException(
- String.format("%s should never be translated",
- ProcessElements.class.getCanonicalName()));
- }
}
// the TransformEvaluatorFactories can construct instances of all generic types of transform,
http://git-wip-us.apache.org/repos/asf/beam/blob/10c63e15/runners/flink/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/pom.xml b/runners/flink/pom.xml
index e77dbc8..7840c32 100644
--- a/runners/flink/pom.xml
+++ b/runners/flink/pom.xml
@@ -219,11 +219,6 @@
<!-- Beam -->
<dependency>
<groupId>org.apache.beam</groupId>
- <artifactId>beam-model-pipeline</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<exclusions>
<exclusion>
http://git-wip-us.apache.org/repos/asf/beam/blob/10c63e15/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
index d1e2d57..cec01f8 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
@@ -31,12 +31,10 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
-import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.KeyedWorkItem;
import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems;
import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.runners.core.construction.PTransformTranslation;
-import org.apache.beam.runners.core.construction.SdkComponents;
import org.apache.beam.runners.core.construction.SplittableParDo;
import org.apache.beam.runners.core.construction.TransformPayloadTranslatorRegistrar;
import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows;
@@ -56,7 +54,6 @@ import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
@@ -1085,7 +1082,7 @@ class FlinkStreamingTransformTranslators {
* once SDF is reorganized appropriately.
*/
private static class SplittableParDoProcessElementsTranslator
- implements PTransformTranslation.TransformPayloadTranslator<
+ extends PTransformTranslation.TransformPayloadTranslator.NotSerializable<
SplittableParDoViaKeyedWorkItems.ProcessElements<?, ?, ?, ?>> {
private SplittableParDoProcessElementsTranslator() {}
@@ -1094,17 +1091,6 @@ class FlinkStreamingTransformTranslators {
public String getUrn(SplittableParDoViaKeyedWorkItems.ProcessElements<?, ?, ?, ?> transform) {
return SPLITTABLE_PROCESS_URN;
}
-
- @Override
- public RunnerApi.FunctionSpec translate(
- AppliedPTransform<?, ?, SplittableParDoViaKeyedWorkItems.ProcessElements<?, ?, ?, ?>>
- transform,
- SdkComponents components) {
- throw new UnsupportedOperationException(
- String.format(
- "%s should never be translated",
- SplittableParDoViaKeyedWorkItems.ProcessElements.class.getCanonicalName()));
- }
}
/** Registers classes specialized to the Flink runner. */
@@ -1128,6 +1114,11 @@ class FlinkStreamingTransformTranslators {
new SplittableParDoGbkIntoKeyedWorkItemsPayloadTranslator())
.build();
}
+
+ @Override
+ public Map<String, PTransformTranslation.TransformPayloadTranslator> getTransformRehydrators() {
+ return Collections.emptyMap();
+ }
}
/**
@@ -1135,7 +1126,7 @@ class FlinkStreamingTransformTranslators {
* once SDF is reorganized appropriately.
*/
private static class SplittableParDoProcessElementsPayloadTranslator
- implements PTransformTranslation.TransformPayloadTranslator<
+ extends PTransformTranslation.TransformPayloadTranslator.NotSerializable<
SplittableParDoViaKeyedWorkItems.ProcessElements<?, ?, ?, ?>> {
private SplittableParDoProcessElementsPayloadTranslator() {}
@@ -1144,17 +1135,6 @@ class FlinkStreamingTransformTranslators {
public String getUrn(SplittableParDoViaKeyedWorkItems.ProcessElements<?, ?, ?, ?> transform) {
return SplittableParDo.SPLITTABLE_PROCESS_URN;
}
-
- @Override
- public RunnerApi.FunctionSpec translate(
- AppliedPTransform<?, ?, SplittableParDoViaKeyedWorkItems.ProcessElements<?, ?, ?, ?>>
- transform,
- SdkComponents components) {
- throw new UnsupportedOperationException(
- String.format(
- "%s should never be translated",
- SplittableParDoViaKeyedWorkItems.ProcessElements.class.getCanonicalName()));
- }
}
/**
@@ -1162,7 +1142,7 @@ class FlinkStreamingTransformTranslators {
* once SDF is reorganized appropriately.
*/
private static class SplittableParDoGbkIntoKeyedWorkItemsPayloadTranslator
- implements PTransformTranslation.TransformPayloadTranslator<
+ extends PTransformTranslation.TransformPayloadTranslator.NotSerializable<
SplittableParDoViaKeyedWorkItems.GBKIntoKeyedWorkItems<?, ?>> {
private SplittableParDoGbkIntoKeyedWorkItemsPayloadTranslator() {}
@@ -1171,24 +1151,13 @@ class FlinkStreamingTransformTranslators {
public String getUrn(SplittableParDoViaKeyedWorkItems.GBKIntoKeyedWorkItems<?, ?> transform) {
return SplittableParDo.SPLITTABLE_GBKIKWI_URN;
}
-
- @Override
- public RunnerApi.FunctionSpec translate(
- AppliedPTransform<?, ?, SplittableParDoViaKeyedWorkItems.GBKIntoKeyedWorkItems<?, ?>>
- transform,
- SdkComponents components) {
- throw new UnsupportedOperationException(
- String.format(
- "%s should never be translated",
- SplittableParDoViaKeyedWorkItems.GBKIntoKeyedWorkItems.class.getCanonicalName()));
- }
}
/**
* A translator just to vend the URN.
*/
private static class CreateStreamingFlinkViewPayloadTranslator
- implements PTransformTranslation.TransformPayloadTranslator<
+ extends PTransformTranslation.TransformPayloadTranslator.NotSerializable<
CreateStreamingFlinkView.CreateFlinkPCollectionView<?, ?>> {
private CreateStreamingFlinkViewPayloadTranslator() {}
@@ -1197,16 +1166,5 @@ class FlinkStreamingTransformTranslators {
public String getUrn(CreateStreamingFlinkView.CreateFlinkPCollectionView<?, ?> transform) {
return CreateStreamingFlinkView.CREATE_STREAMING_FLINK_VIEW_URN;
}
-
- @Override
- public RunnerApi.FunctionSpec translate(
- AppliedPTransform<?, ?, CreateStreamingFlinkView.CreateFlinkPCollectionView<?, ?>>
- transform,
- SdkComponents components) {
- throw new UnsupportedOperationException(
- String.format(
- "%s should never be translated",
- CreateStreamingFlinkView.CreateFlinkPCollectionView.class.getCanonicalName()));
- }
}
}