You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/10/17 21:08:13 UTC

[05/14] beam git commit: Add TransformPayloadTranslator.rehydrate to optionally specialize RawPTransform

Add TransformPayloadTranslator.rehydrate to optionally specialize RawPTransform


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/10c63e15
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/10c63e15
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/10c63e15

Branch: refs/heads/master
Commit: 10c63e15ab51b885372f7b6251d8ace63bae0ad1
Parents: c14455e
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Oct 2 19:25:28 2017 -0700
Committer: Kenneth Knowles <ke...@apache.org>
Committed: Tue Oct 17 12:45:11 2017 -0700

----------------------------------------------------------------------
 .../core/construction/CombineTranslation.java   |  33 ++--
 .../CreatePCollectionViewTranslation.java       |   7 +-
 .../core/construction/FlattenTranslator.java    |   8 +-
 .../construction/GroupByKeyTranslation.java     |  13 +-
 .../construction/PTransformTranslation.java     | 155 ++++++++++++++++++-
 .../core/construction/ParDoTranslation.java     |  70 ++++-----
 .../core/construction/PipelineTranslation.java  |  76 +--------
 .../core/construction/ReadTranslation.java      |  43 +++--
 .../construction/TestStreamTranslation.java     |   8 +-
 .../TransformPayloadTranslatorRegistrar.java    |   2 +
 .../construction/WindowIntoTranslation.java     |  15 +-
 .../construction/WriteFilesTranslation.java     |  16 +-
 .../direct/TransformEvaluatorRegistry.java      |  18 +--
 runners/flink/pom.xml                           |   5 -
 .../FlinkStreamingTransformTranslators.java     |  60 ++-----
 15 files changed, 280 insertions(+), 249 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/10c63e15/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java
index c3d9553..69591ee 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java
@@ -54,11 +54,10 @@ import org.apache.beam.sdk.values.PCollection;
 public class CombineTranslation {
   public static final String JAVA_SERIALIZED_COMBINE_FN_URN = "urn:beam:combinefn:javasdk:v1";
 
-   /**
-   * A {@link TransformPayloadTranslator} for {@link Combine.PerKey}.
-   */
+  /** A {@link TransformPayloadTranslator} for {@link Combine.PerKey}. */
   public static class CombinePayloadTranslator
-      implements PTransformTranslation.TransformPayloadTranslator<Combine.PerKey<?, ?, ?>> {
+      extends PTransformTranslation.TransformPayloadTranslator.WithDefaultRehydration<
+          Combine.PerKey<?, ?, ?>> {
     public static TransformPayloadTranslator create() {
       return new CombinePayloadTranslator();
     }
@@ -81,9 +80,7 @@ public class CombineTranslation {
           .build();
     }
 
-    /**
-     * Registers {@link CombinePayloadTranslator}.
-     */
+    /** Registers {@link CombinePayloadTranslator}. */
     @AutoService(TransformPayloadTranslatorRegistrar.class)
     public static class Registrar implements TransformPayloadTranslatorRegistrar {
       @Override
@@ -91,6 +88,11 @@ public class CombineTranslation {
           getTransformPayloadTranslators() {
         return Collections.singletonMap(Combine.PerKey.class, new CombinePayloadTranslator());
       }
+
+      @Override
+      public Map<String, TransformPayloadTranslator> getTransformRehydrators() {
+        return Collections.emptyMap();
+      }
     }
   }
 
@@ -136,8 +138,7 @@ public class CombineTranslation {
         .setSpec(
             FunctionSpec.newBuilder()
                 .setUrn(JAVA_SERIALIZED_COMBINE_FN_URN)
-                .setPayload(
-                    ByteString.copyFrom(SerializableUtils.serializeToByteArray(combineFn)))
+                .setPayload(ByteString.copyFrom(SerializableUtils.serializeToByteArray(combineFn)))
                 .build())
         .build();
   }
@@ -148,8 +149,8 @@ public class CombineTranslation {
     return components.getCoder(id);
   }
 
-  public static Coder<?> getAccumulatorCoder(
-      AppliedPTransform<?, ?, ?> transform) throws IOException {
+  public static Coder<?> getAccumulatorCoder(AppliedPTransform<?, ?, ?> transform)
+      throws IOException {
     SdkComponents sdkComponents = SdkComponents.create();
     String id = getCombinePayload(transform, sdkComponents).getAccumulatorCoderId();
     Components components = sdkComponents.toComponents();
@@ -157,17 +158,11 @@ public class CombineTranslation {
         components.getCodersOrThrow(id), RehydratedComponents.forComponents(components));
   }
 
-  public static GlobalCombineFn<?, ?, ?> getCombineFn(CombinePayload payload)
-      throws IOException {
+  public static GlobalCombineFn<?, ?, ?> getCombineFn(CombinePayload payload) throws IOException {
     checkArgument(payload.getCombineFn().getSpec().getUrn().equals(JAVA_SERIALIZED_COMBINE_FN_URN));
     return (GlobalCombineFn<?, ?, ?>)
         SerializableUtils.deserializeFromByteArray(
-            payload
-                .getCombineFn()
-                .getSpec()
-                .getPayload()
-                .toByteArray(),
-            "CombineFn");
+            payload.getCombineFn().getSpec().getPayload().toByteArray(), "CombineFn");
   }
 
   public static GlobalCombineFn<?, ?, ?> getCombineFn(AppliedPTransform<?, ?, ?> transform)

http://git-wip-us.apache.org/repos/asf/beam/blob/10c63e15/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java
index 4b8edcf..709cb8a 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java
@@ -88,7 +88,7 @@ public class CreatePCollectionViewTranslation {
    */
   @Deprecated
   static class CreatePCollectionViewTranslator
-      implements TransformPayloadTranslator<View.CreatePCollectionView<?, ?>> {
+      extends TransformPayloadTranslator.WithDefaultRehydration<View.CreatePCollectionView<?, ?>> {
     @Override
     public String getUrn(View.CreatePCollectionView<?, ?> transform) {
       return PTransformTranslation.CREATE_VIEW_TRANSFORM_URN;
@@ -122,5 +122,10 @@ public class CreatePCollectionViewTranslation {
       return Collections.singletonMap(
           View.CreatePCollectionView.class, new CreatePCollectionViewTranslator());
     }
+
+    @Override
+    public Map<String, TransformPayloadTranslator> getTransformRehydrators() {
+      return Collections.emptyMap();
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/10c63e15/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/FlattenTranslator.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/FlattenTranslator.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/FlattenTranslator.java
index c9798e6..972c453 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/FlattenTranslator.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/FlattenTranslator.java
@@ -32,7 +32,8 @@ import org.apache.beam.sdk.transforms.windowing.Window.Assign;
 /**
  * Utility methods for translating a {@link Assign} to and from {@link RunnerApi} representations.
  */
-public class FlattenTranslator implements TransformPayloadTranslator<Flatten.PCollections<?>> {
+public class FlattenTranslator
+    extends TransformPayloadTranslator.WithDefaultRehydration<Flatten.PCollections<?>> {
 
   public static TransformPayloadTranslator create() {
     return new FlattenTranslator();
@@ -59,5 +60,10 @@ public class FlattenTranslator implements TransformPayloadTranslator<Flatten.PCo
         getTransformPayloadTranslators() {
       return Collections.singletonMap(Flatten.PCollections.class, new FlattenTranslator());
     }
+
+    @Override
+    public Map<String, TransformPayloadTranslator> getTransformRehydrators() {
+      return Collections.emptyMap();
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/10c63e15/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/GroupByKeyTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/GroupByKeyTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/GroupByKeyTranslation.java
index 840bae2..0803ad3 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/GroupByKeyTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/GroupByKeyTranslation.java
@@ -34,7 +34,8 @@ import org.apache.beam.sdk.transforms.PTransform;
  */
 public class GroupByKeyTranslation {
 
-  static class GroupByKeyTranslator implements TransformPayloadTranslator<GroupByKey<?, ?>> {
+  static class GroupByKeyTranslator
+      extends TransformPayloadTranslator.WithDefaultRehydration<GroupByKey<?, ?>> {
     @Override
     public String getUrn(GroupByKey<?, ?> transform) {
       return PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN;
@@ -43,13 +44,10 @@ public class GroupByKeyTranslation {
     @Override
     public FunctionSpec translate(
         AppliedPTransform<?, ?, GroupByKey<?, ?>> transform, SdkComponents components) {
-      return FunctionSpec.newBuilder()
-          .setUrn(getUrn(transform.getTransform()))
-          .build();
+      return FunctionSpec.newBuilder().setUrn(getUrn(transform.getTransform())).build();
     }
   }
 
-
   /** Registers {@link GroupByKeyTranslator}. */
   @AutoService(TransformPayloadTranslatorRegistrar.class)
   public static class Registrar implements TransformPayloadTranslatorRegistrar {
@@ -58,5 +56,10 @@ public class GroupByKeyTranslation {
         getTransformPayloadTranslators() {
       return Collections.singletonMap(GroupByKey.class, new GroupByKeyTranslator());
     }
+
+    @Override
+    public Map<String, TransformPayloadTranslator> getTransformRehydrators() {
+      return Collections.emptyMap();
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/10c63e15/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
index 31767a0..785b9e4 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
@@ -20,7 +20,9 @@ package org.apache.beam.runners.core.construction;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
+import com.google.auto.value.AutoValue;
 import com.google.common.base.Joiner;
+import com.google.common.base.MoreObjects;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Sets;
 import java.io.IOException;
@@ -74,6 +76,12 @@ public class PTransformTranslation {
   private static final Map<Class<? extends PTransform>, TransformPayloadTranslator>
       KNOWN_PAYLOAD_TRANSLATORS = loadTransformPayloadTranslators();
 
+  private static final Map<String, TransformPayloadTranslator> KNOWN_REHYDRATORS =
+      loadTransformRehydrators();
+
+  private static final TransformPayloadTranslator<?> DEFAULT_REHYDRATOR =
+      new RawPTransformTranslator();
+
   private static Map<Class<? extends PTransform>, TransformPayloadTranslator>
       loadTransformPayloadTranslators() {
     HashMap<Class<? extends PTransform>, TransformPayloadTranslator> translators = new HashMap<>();
@@ -98,6 +106,29 @@ public class PTransformTranslation {
     return ImmutableMap.copyOf(translators);
   }
 
+  private static Map<String, TransformPayloadTranslator> loadTransformRehydrators() {
+    HashMap<String, TransformPayloadTranslator> rehydrators = new HashMap<>();
+
+    for (TransformPayloadTranslatorRegistrar registrar :
+        ServiceLoader.load(TransformPayloadTranslatorRegistrar.class)) {
+
+      Map<String, ? extends TransformPayloadTranslator> newRehydrators =
+          registrar.getTransformRehydrators();
+
+      Set<String> alreadyRegistered =
+          Sets.intersection(rehydrators.keySet(), newRehydrators.keySet());
+
+      if (!alreadyRegistered.isEmpty()) {
+        throw new IllegalArgumentException(
+            String.format(
+                "URNs already registered: %s", Joiner.on(", ").join(alreadyRegistered)));
+      }
+
+      rehydrators.putAll(newRehydrators);
+    }
+    return ImmutableMap.copyOf(rehydrators);
+  }
+
   private PTransformTranslation() {}
 
   /**
@@ -150,17 +181,36 @@ public class PTransformTranslation {
       // context of our current serialization
       transformBuilder.setSpec(((RawPTransform<?, ?>) transform).migrate(components));
     } else if (KNOWN_PAYLOAD_TRANSLATORS.containsKey(transform.getClass())) {
-      FunctionSpec payload =
+      transformBuilder.setSpec(
           KNOWN_PAYLOAD_TRANSLATORS
               .get(transform.getClass())
-              .translate(appliedPTransform, components);
-      transformBuilder.setSpec(payload);
+              .translate(appliedPTransform, components));
     }
 
     return transformBuilder.build();
   }
 
   /**
+   * Translates a {@link RunnerApi.PTransform} to a {@link RawPTransform} specialized for the URN
+   * and spec.
+   */
+  static RawPTransform<?, ?> rehydrate(
+      RunnerApi.PTransform protoTransform, RehydratedComponents rehydratedComponents)
+      throws IOException {
+
+    @Nullable
+    TransformPayloadTranslator<?> rehydrator =
+        KNOWN_REHYDRATORS.get(
+            protoTransform.getSpec() == null ? null : protoTransform.getSpec().getUrn());
+
+    if (rehydrator == null) {
+      return DEFAULT_REHYDRATOR.rehydrate(protoTransform, rehydratedComponents);
+    } else {
+      return rehydrator.rehydrate(protoTransform, rehydratedComponents);
+    }
+  }
+
+  /**
    * Translates a composite {@link AppliedPTransform} into a runner API proto with no component
    * transforms.
    *
@@ -206,14 +256,66 @@ public class PTransformTranslation {
   }
 
   /**
-   * A translator consumes a {@link PTransform} application and produces the appropriate
-   * FunctionSpec for a distinguished or primitive transform within the Beam runner API.
+   * A bi-directional translator between a Java-based {@link PTransform} and a protobuf payload for
+   * that transform.
+   *
+   * <p>When going to a protocol buffer message, the translator produces a payload corresponding to
+   * the Java representation while registering components that payload references.
+   *
+   * <p>When "rehydrating" a protocol buffer message, the translator returns a {@link RawPTransform}
+   * - because the transform may not be Java-based, it is not possible to rebuild a Java-based
+   * {@link PTransform}. The resulting {@link RawPTransform} subclass encapsulates the knowledge of
+   * which components are referenced in the payload.
    */
   public interface TransformPayloadTranslator<T extends PTransform<?, ?>> {
     String getUrn(T transform);
 
     FunctionSpec translate(AppliedPTransform<?, ?, T> application, SdkComponents components)
         throws IOException;
+
+    RawPTransform<?, ?> rehydrate(
+        RunnerApi.PTransform protoTransform, RehydratedComponents rehydratedComponents)
+        throws IOException;
+
+    /**
+     * A {@link TransformPayloadTranslator} for transforms that contain no references to components,
+     * so they do not need a specialized rehydration.
+     */
+    abstract class WithDefaultRehydration<T extends PTransform<?, ?>>
+        implements TransformPayloadTranslator<T> {
+      @Override
+      public final RawPTransform<?, ?> rehydrate(
+          RunnerApi.PTransform protoTransform, RehydratedComponents rehydratedComponents)
+          throws IOException {
+        return UnknownRawPTransform.forSpec(protoTransform.getSpec());
+      }
+    }
+
+    /**
+     * A {@link TransformPayloadTranslator} for transforms that contain no references to components,
+     * so they do not need a specialized rehydration.
+     */
+    abstract class NotSerializable<T extends PTransform<?, ?>>
+        implements TransformPayloadTranslator<T> {
+      @Override
+      public final FunctionSpec translate(
+          AppliedPTransform<?, ?, T> transform, SdkComponents components) throws IOException {
+        throw new UnsupportedOperationException(
+            String.format(
+                "%s should never be translated",
+                transform.getTransform().getClass().getCanonicalName()));
+      }
+
+      @Override
+      public final RawPTransform<?, ?> rehydrate(
+          RunnerApi.PTransform protoTransform, RehydratedComponents rehydratedComponents)
+          throws IOException {
+        throw new UnsupportedOperationException(
+            String.format(
+                "%s.rehydrate should never be called; there is no serialized form",
+                getClass().getCanonicalName()));
+      }
+    }
   }
 
   /**
@@ -264,6 +366,43 @@ public class PTransformTranslation {
     }
   }
 
+  @AutoValue
+  abstract static class UnknownRawPTransform extends RawPTransform<PInput, POutput> {
+
+    @Override
+    public String getUrn() {
+      return getSpec() == null ? null : getSpec().getUrn();
+    }
+
+    @Nullable
+    public abstract RunnerApi.FunctionSpec getSpec();
+
+    public static UnknownRawPTransform forSpec(RunnerApi.FunctionSpec spec) {
+      return new AutoValue_PTransformTranslation_UnknownRawPTransform(spec);
+    }
+
+    @Override
+    public POutput expand(PInput input) {
+      throw new IllegalStateException(
+          String.format(
+              "%s should never be asked to expand;"
+                  + " it is the result of deserializing an already-constructed Pipeline",
+              getClass().getSimpleName()));
+    }
+
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(this)
+          .add("urn", getUrn())
+          .add("payload", getSpec())
+          .toString();
+    }
+
+    public RunnerApi.FunctionSpec getSpecForComponents(SdkComponents components) {
+      return getSpec();
+    }
+  }
+
   /** A translator that uses the explicit URN and payload from a {@link RawPTransform}. */
   public static class RawPTransformTranslator
       implements TransformPayloadTranslator<RawPTransform<?, ?>> {
@@ -278,5 +417,11 @@ public class PTransformTranslation {
         throws IOException {
       return transform.getTransform().migrate(components);
     }
+
+    @Override
+    public RawPTransform<?, ?> rehydrate(
+        RunnerApi.PTransform protoTransform, RehydratedComponents rehydratedComponents) {
+      return UnknownRawPTransform.forSpec(protoTransform.getSpec());
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/10c63e15/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
index 3886e47..5092448 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
@@ -79,29 +79,20 @@ import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
 import org.apache.beam.sdk.values.WindowingStrategy;
 
-/**
- * Utilities for interacting with {@link ParDo} instances and {@link ParDoPayload} protos.
- */
+/** Utilities for interacting with {@link ParDo} instances and {@link ParDoPayload} protos. */
 public class ParDoTranslation {
-  /**
-   * The URN for an unknown Java {@link DoFn}.
-   */
+  /** The URN for an unknown Java {@link DoFn}. */
   public static final String CUSTOM_JAVA_DO_FN_URN = "urn:beam:dofn:javasdk:0.1";
-  /**
-   * The URN for an unknown Java {@link ViewFn}.
-   */
+  /** The URN for an unknown Java {@link ViewFn}. */
   public static final String CUSTOM_JAVA_VIEW_FN_URN = "urn:beam:viewfn:javasdk:0.1";
-  /**
-   * The URN for an unknown Java {@link WindowMappingFn}.
-   */
+  /** The URN for an unknown Java {@link WindowMappingFn}. */
   public static final String CUSTOM_JAVA_WINDOW_MAPPING_FN_URN =
       "urn:beam:windowmappingfn:javasdk:0.1";
 
-  /**
-   * A {@link TransformPayloadTranslator} for {@link ParDo}.
-   */
+  /** A {@link TransformPayloadTranslator} for {@link ParDo}. */
   public static class ParDoPayloadTranslator
-      implements PTransformTranslation.TransformPayloadTranslator<ParDo.MultiOutput<?, ?>> {
+      extends PTransformTranslation.TransformPayloadTranslator.WithDefaultRehydration<
+          ParDo.MultiOutput<?, ?>> {
     public static TransformPayloadTranslator create() {
       return new ParDoPayloadTranslator();
     }
@@ -124,9 +115,7 @@ public class ParDoTranslation {
           .build();
     }
 
-    /**
-     * Registers {@link ParDoPayloadTranslator}.
-     */
+    /** Registers {@link ParDoPayloadTranslator}. */
     @AutoService(TransformPayloadTranslatorRegistrar.class)
     public static class Registrar implements TransformPayloadTranslatorRegistrar {
       @Override
@@ -134,11 +123,16 @@ public class ParDoTranslation {
           getTransformPayloadTranslators() {
         return Collections.singletonMap(ParDo.MultiOutput.class, new ParDoPayloadTranslator());
       }
+
+      @Override
+      public Map<String, TransformPayloadTranslator> getTransformRehydrators() {
+        return Collections.emptyMap();
+      }
     }
   }
 
   public static ParDoPayload toProto(ParDo.MultiOutput<?, ?> parDo, SdkComponents components)
-  throws IOException {
+      throws IOException {
     DoFn<?, ?> doFn = parDo.getFn();
     DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
     Map<String, StateDeclaration> states = signature.stateDeclarations();
@@ -158,13 +152,11 @@ public class ParDoTranslation {
       }
     }
     for (Map.Entry<String, StateDeclaration> state : states.entrySet()) {
-      RunnerApi.StateSpec spec =
-          toProto(getStateSpecOrCrash(state.getValue(), doFn), components);
+      RunnerApi.StateSpec spec = toProto(getStateSpecOrCrash(state.getValue(), doFn), components);
       builder.putStateSpecs(state.getKey(), spec);
     }
     for (Map.Entry<String, TimerDeclaration> timer : timers.entrySet()) {
-      RunnerApi.TimerSpec spec =
-          toProto(getTimerSpecOrCrash(timer.getValue(), doFn));
+      RunnerApi.TimerSpec spec = toProto(getTimerSpecOrCrash(timer.getValue(), doFn));
       builder.putTimerSpecs(timer.getKey(), spec);
     }
     return builder.build();
@@ -174,7 +166,8 @@ public class ParDoTranslation {
       StateDeclaration stateDeclaration, DoFn<?, ?> target) {
     try {
       Object fieldValue = stateDeclaration.field().get(target);
-      checkState(fieldValue instanceof StateSpec,
+      checkState(
+          fieldValue instanceof StateSpec,
           "Malformed %s class %s: state declaration field %s does not have type %s.",
           DoFn.class.getSimpleName(),
           target.getClass().getName(),
@@ -196,7 +189,8 @@ public class ParDoTranslation {
       TimerDeclaration timerDeclaration, DoFn<?, ?> target) {
     try {
       Object fieldValue = timerDeclaration.field().get(target);
-      checkState(fieldValue instanceof TimerSpec,
+      checkState(
+          fieldValue instanceof TimerSpec,
           "Malformed %s class %s: timer declaration field %s does not have type %s.",
           DoFn.class.getSimpleName(),
           target.getClass().getName(),
@@ -273,8 +267,7 @@ public class ParDoTranslation {
     }
 
     SdkComponents sdkComponents = SdkComponents.create();
-    RunnerApi.PTransform parDoProto =
-        PTransformTranslation.toProto(application, sdkComponents);
+    RunnerApi.PTransform parDoProto = PTransformTranslation.toProto(application, sdkComponents);
     ParDoPayload payload = ParDoPayload.parseFrom(parDoProto.getSpec().getPayload());
 
     List<PCollectionView<?>> views = new ArrayList<>();
@@ -289,12 +282,7 @@ public class ParDoTranslation {
               "no input with tag %s",
               sideInputTag);
       views.add(
-          viewFromProto(
-              sideInput,
-              sideInputTag,
-              originalPCollection,
-              parDoProto,
-              components));
+          viewFromProto(sideInput, sideInputTag, originalPCollection, parDoProto, components));
     }
     return views;
   }
@@ -414,7 +402,6 @@ public class ParDoTranslation {
       default:
         throw new IllegalArgumentException(
             String.format("Unknown %s: %s", RunnerApi.StateSpec.class.getName(), stateSpec));
-
     }
   }
 
@@ -431,7 +418,7 @@ public class ParDoTranslation {
   }
 
   private static RunnerApi.TimeDomain.Enum toProto(TimeDomain timeDomain) {
-    switch(timeDomain) {
+    switch (timeDomain) {
       case EVENT_TIME:
         return RunnerApi.TimeDomain.Enum.EVENT_TIME;
       case PROCESSING_TIME:
@@ -445,12 +432,12 @@ public class ParDoTranslation {
 
   @AutoValue
   abstract static class DoFnAndMainOutput implements Serializable {
-    public static DoFnAndMainOutput of(
-        DoFn<?, ?> fn, TupleTag<?> tag) {
+    public static DoFnAndMainOutput of(DoFn<?, ?> fn, TupleTag<?> tag) {
       return new AutoValue_ParDoTranslation_DoFnAndMainOutput(fn, tag);
     }
 
     abstract DoFn<?, ?> getDoFn();
+
     abstract TupleTag<?> getMainOutputTag();
   }
 
@@ -475,8 +462,7 @@ public class ParDoTranslation {
         FunctionSpec.class.getSimpleName(),
         CUSTOM_JAVA_DO_FN_URN,
         fnSpec.getSpec().getUrn());
-    byte[] serializedFn =
-        fnSpec.getSpec().getPayload().toByteArray();
+    byte[] serializedFn = fnSpec.getSpec().getPayload().toByteArray();
     return (DoFnAndMainOutput)
         SerializableUtils.deserializeFromByteArray(serializedFn, "Custom DoFn And Main Output tag");
   }
@@ -505,9 +491,7 @@ public class ParDoTranslation {
   public static SideInput toProto(PCollectionView<?> view) {
     Builder builder = SideInput.newBuilder();
     builder.setAccessPattern(
-        FunctionSpec.newBuilder()
-            .setUrn(view.getViewFn().getMaterialization().getUrn())
-            .build());
+        FunctionSpec.newBuilder().setUrn(view.getViewFn().getMaterialization().getUrn()).build());
     builder.setViewFn(toProto(view.getViewFn()));
     builder.setWindowMappingFn(toProto(view.getWindowMappingFn()));
     return builder.build();

http://git-wip-us.apache.org/repos/asf/beam/blob/10c63e15/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
index 1624865..85033e5 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
@@ -20,8 +20,6 @@ package org.apache.beam.runners.core.construction;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
-import com.google.auto.value.AutoValue;
-import com.google.common.base.MoreObjects;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ListMultimap;
 import java.io.IOException;
@@ -32,12 +30,10 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import javax.annotation.Nullable;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.construction.PTransformTranslation.RawPTransform;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.Pipeline.PipelineVisitor;
-import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.runners.TransformHierarchy;
@@ -47,8 +43,6 @@ import org.apache.beam.sdk.transforms.display.HasDisplayData;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PCollectionViews;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TupleTag;
 
@@ -148,6 +142,8 @@ public class PipelineTranslation {
     }
 
     RunnerApi.FunctionSpec transformSpec = transformProto.getSpec();
+    RawPTransform<?, ?> transform =
+        PTransformTranslation.rehydrate(transformProto, rehydratedComponents);
 
     // By default, no "additional" inputs, since that is an SDK-specific thing.
     // Only ParDo and WriteFiles really separate main from side inputs
@@ -170,20 +166,6 @@ public class PipelineTranslation {
               transformProto, rehydratedComponents, rehydratedInputs, payload.getSideInputsMap());
     }
 
-    // TODO: CombineTranslator should own it - https://issues.apache.org/jira/browse/BEAM-2674
-    List<Coder<?>> additionalCoders = Collections.emptyList();
-    if (transformSpec.getUrn().equals(PTransformTranslation.COMBINE_TRANSFORM_URN)) {
-      RunnerApi.CombinePayload payload =
-          RunnerApi.CombinePayload.parseFrom(transformSpec.getPayload());
-      additionalCoders =
-          (List)
-              Collections.singletonList(
-                  rehydratedComponents.getCoder(payload.getAccumulatorCoderId()));
-    }
-
-    RehydratedPTransform transform =
-        RehydratedPTransform.of(transformSpec, additionalInputs, additionalCoders);
-
     if (isPrimitive(transformProto)) {
       transforms.addFinalizedPrimitiveNode(
           transformProto.getUniqueName(), rehydratedInputs, transform, rehydratedOutputs);
@@ -232,58 +214,4 @@ public class PipelineTranslation {
             .values()
             .containsAll(transformProto.getOutputsMap().values());
   }
-
-  @AutoValue
-  abstract static class RehydratedPTransform extends RawPTransform<PInput, POutput> {
-
-    @Nullable
-    public abstract RunnerApi.FunctionSpec getSpec();
-
-    @Override
-    public abstract Map<TupleTag<?>, PValue> getAdditionalInputs();
-
-    public abstract List<Coder<?>> getCoders();
-
-    @Override
-    public String getUrn() {
-      return getSpec().getUrn();
-    }
-
-    public static RehydratedPTransform of(
-        RunnerApi.FunctionSpec payload,
-        Map<TupleTag<?>, PValue> additionalInputs,
-        List<Coder<?>> additionalCoders) {
-      return new AutoValue_PipelineTranslation_RehydratedPTransform(
-          payload, additionalInputs, additionalCoders);
-    }
-
-    @Override
-    public POutput expand(PInput input) {
-      throw new IllegalStateException(
-          String.format(
-              "%s should never be asked to expand;"
-                  + " it is the result of deserializing an already-constructed Pipeline",
-              getClass().getSimpleName()));
-    }
-
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(this)
-          .add("urn", getUrn())
-          .add("payload", getSpec())
-          .toString();
-    }
-
-    @Override
-    public RunnerApi.FunctionSpec migrate(SdkComponents components) {
-      for (Coder<?> coder : getCoders()) {
-        try {
-          components.registerCoder(coder);
-        } catch (IOException e) {
-          throw new RuntimeException(e);
-        }
-      }
-      return getSpec();
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/10c63e15/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java
index e9168a2..4b14c51 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java
@@ -44,8 +44,8 @@ import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 
 /**
- * Methods for translating {@link Read.Bounded} and {@link Read.Unbounded}
- * {@link PTransform PTransformTranslation} into {@link ReadPayload} protos.
+ * Methods for translating {@link Read.Bounded} and {@link Read.Unbounded} {@link PTransform
+ * PTransformTranslation} into {@link ReadPayload} protos.
  */
 public class ReadTranslation {
   private static final String JAVA_SERIALIZED_BOUNDED_SOURCE = "urn:beam:java:boundedsource:v1";
@@ -89,13 +89,9 @@ public class ReadTranslation {
   public static BoundedSource<?> boundedSourceFromProto(ReadPayload payload)
       throws InvalidProtocolBufferException {
     checkArgument(payload.getIsBounded().equals(IsBounded.Enum.BOUNDED));
-    return (BoundedSource<?>) SerializableUtils.deserializeFromByteArray(
-        payload
-            .getSource()
-            .getSpec()
-            .getPayload()
-            .toByteArray(),
-        "BoundedSource");
+    return (BoundedSource<?>)
+        SerializableUtils.deserializeFromByteArray(
+            payload.getSource().getSpec().getPayload().toByteArray(), "BoundedSource");
   }
 
   public static <T> BoundedSource<T> boundedSourceFromTransform(
@@ -136,13 +132,9 @@ public class ReadTranslation {
   public static UnboundedSource<?, ?> unboundedSourceFromProto(ReadPayload payload)
       throws InvalidProtocolBufferException {
     checkArgument(payload.getIsBounded().equals(IsBounded.Enum.UNBOUNDED));
-    return (UnboundedSource<?, ?>) SerializableUtils.deserializeFromByteArray(
-        payload
-            .getSource()
-            .getSpec()
-            .getPayload()
-            .toByteArray(),
-        "BoundedSource");
+    return (UnboundedSource<?, ?>)
+        SerializableUtils.deserializeFromByteArray(
+            payload.getSource().getSpec().getPayload().toByteArray(), "BoundedSource");
   }
 
   public static PCollection.IsBounded sourceIsBounded(AppliedPTransform<?, ?, ?> transform) {
@@ -161,11 +153,10 @@ public class ReadTranslation {
     }
   }
 
-  /**
-   * A {@link TransformPayloadTranslator} for {@link Read.Unbounded}.
-   */
+  /** A {@link TransformPayloadTranslator} for {@link Read.Unbounded}. */
   public static class UnboundedReadPayloadTranslator
-      implements PTransformTranslation.TransformPayloadTranslator<Read.Unbounded<?>> {
+      extends PTransformTranslation.TransformPayloadTranslator.WithDefaultRehydration<
+          Read.Unbounded<?>> {
     public static TransformPayloadTranslator create() {
       return new UnboundedReadPayloadTranslator();
     }
@@ -188,11 +179,10 @@ public class ReadTranslation {
     }
   }
 
-  /**
-   * A {@link TransformPayloadTranslator} for {@link Read.Bounded}.
-   */
+  /** A {@link TransformPayloadTranslator} for {@link Read.Bounded}. */
   public static class BoundedReadPayloadTranslator
-      implements PTransformTranslation.TransformPayloadTranslator<Read.Bounded<?>> {
+      extends PTransformTranslation.TransformPayloadTranslator.WithDefaultRehydration<
+          Read.Bounded<?>> {
     public static TransformPayloadTranslator create() {
       return new BoundedReadPayloadTranslator();
     }
@@ -226,5 +216,10 @@ public class ReadTranslation {
           .put(Read.Bounded.class, new BoundedReadPayloadTranslator())
           .build();
     }
+
+    @Override
+    public Map<String, TransformPayloadTranslator> getTransformRehydrators() {
+      return Collections.emptyMap();
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/10c63e15/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java
index de4d6bb..8e4c1db 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java
@@ -172,7 +172,8 @@ public class TestStreamTranslation {
     }
   }
 
-  static class TestStreamTranslator implements TransformPayloadTranslator<TestStream<?>> {
+  static class TestStreamTranslator
+      extends TransformPayloadTranslator.WithDefaultRehydration<TestStream<?>> {
     @Override
     public String getUrn(TestStream<?> transform) {
       return TEST_STREAM_TRANSFORM_URN;
@@ -197,5 +198,10 @@ public class TestStreamTranslation {
         getTransformPayloadTranslators() {
       return Collections.singletonMap(TestStream.class, new TestStreamTranslator());
     }
+
+    @Override
+    public Map<String, TransformPayloadTranslator> getTransformRehydrators() {
+      return Collections.emptyMap();
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/10c63e15/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformPayloadTranslatorRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformPayloadTranslatorRegistrar.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformPayloadTranslatorRegistrar.java
index 3b3ffa1..58417a8 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformPayloadTranslatorRegistrar.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformPayloadTranslatorRegistrar.java
@@ -26,4 +26,6 @@ import org.apache.beam.sdk.transforms.PTransform;
 public interface TransformPayloadTranslatorRegistrar {
   Map<? extends Class<? extends PTransform>, ? extends TransformPayloadTranslator>
       getTransformPayloadTranslators();
+
+  Map<String, ? extends TransformPayloadTranslator> getTransformRehydrators();
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/10c63e15/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java
index ad6177d..9158aba 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java
@@ -41,7 +41,8 @@ import org.apache.beam.sdk.transforms.windowing.WindowFn;
  */
 public class WindowIntoTranslation {
 
-  static class WindowAssignTranslator implements TransformPayloadTranslator<Window.Assign<?>> {
+  static class WindowAssignTranslator
+      extends TransformPayloadTranslator.WithDefaultRehydration<Window.Assign<?>> {
 
     @Override
     public String getUrn(Assign<?> transform) {
@@ -105,11 +106,10 @@ public class WindowIntoTranslation {
         getWindowIntoPayload(application).getWindowFn());
   }
 
-  /**
-   * A {@link TransformPayloadTranslator} for {@link Window}.
-   */
+  /** A {@link TransformPayloadTranslator} for {@link Window}. */
   public static class WindowIntoPayloadTranslator
-      implements PTransformTranslation.TransformPayloadTranslator<Window.Assign<?>> {
+      extends PTransformTranslation.TransformPayloadTranslator.WithDefaultRehydration<
+          Window.Assign<?>> {
     public static TransformPayloadTranslator create() {
       return new WindowIntoPayloadTranslator();
     }
@@ -140,5 +140,10 @@ public class WindowIntoTranslation {
         getTransformPayloadTranslators() {
       return Collections.singletonMap(Window.Assign.class, new WindowIntoPayloadTranslator());
     }
+
+    @Override
+    public Map<String, TransformPayloadTranslator> getTransformRehydrators() {
+      return Collections.emptyMap();
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/10c63e15/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
index 5a49747..645b562 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.core.construction;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.beam.runners.core.construction.PTransformTranslation.WRITE_FILES_TRANSFORM_URN;
 
 import com.google.auto.service.AutoService;
 import com.google.common.annotations.VisibleForTesting;
@@ -173,10 +174,11 @@ public class WriteFilesTranslation {
             .getPayload());
   }
 
-  static class WriteFilesTranslator implements TransformPayloadTranslator<WriteFiles<?, ?, ?>> {
+  static class WriteFilesTranslator
+      extends TransformPayloadTranslator.WithDefaultRehydration<WriteFiles<?, ?, ?>> {
     @Override
     public String getUrn(WriteFiles<?, ?, ?> transform) {
-      return PTransformTranslation.WRITE_FILES_TRANSFORM_URN;
+      return WRITE_FILES_TRANSFORM_URN;
     }
 
     @Override
@@ -193,9 +195,15 @@ public class WriteFilesTranslation {
   @AutoService(TransformPayloadTranslatorRegistrar.class)
   public static class Registrar implements TransformPayloadTranslatorRegistrar {
     @Override
-    public Map<? extends Class<? extends PTransform>, ? extends TransformPayloadTranslator>
+    public Map<Class<? extends PTransform>, TransformPayloadTranslator>
         getTransformPayloadTranslators() {
-      return Collections.singletonMap(WriteFiles.class, new WriteFilesTranslator());
+      return Collections.<Class<? extends PTransform>, TransformPayloadTranslator>singletonMap(
+          WriteFiles.class, new WriteFilesTranslator());
+    }
+
+    @Override
+    public Map<String, TransformPayloadTranslator> getTransformRehydrators() {
+      return Collections.emptyMap();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/10c63e15/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
index 9cfa79f..099252f 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
@@ -35,14 +35,13 @@ import com.google.auto.service.AutoService;
 import com.google.common.collect.ImmutableMap;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
 import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems;
 import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems.ProcessElements;
 import org.apache.beam.runners.core.construction.PTransformTranslation;
 import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
-import org.apache.beam.runners.core.construction.SdkComponents;
 import org.apache.beam.runners.core.construction.TransformPayloadTranslatorRegistrar;
 import org.apache.beam.runners.direct.TestStreamEvaluatorFactory.DirectTestStreamFactory.DirectTestStream;
 import org.apache.beam.sdk.runners.AppliedPTransform;
@@ -115,6 +114,11 @@ class TransformEvaluatorRegistry implements TransformEvaluatorFactory {
               new SplittableParDoProcessElementsTranslator())
           .build();
     }
+
+    @Override
+    public Map<String, TransformPayloadTranslator> getTransformRehydrators() {
+      return Collections.emptyMap();
+    }
   }
 
   /**
@@ -122,7 +126,7 @@ class TransformEvaluatorRegistry implements TransformEvaluatorFactory {
    * once SDF is reorganized appropriately.
    */
   private static class SplittableParDoProcessElementsTranslator
-      implements TransformPayloadTranslator<ProcessElements<?, ?, ?, ?>> {
+      extends TransformPayloadTranslator.NotSerializable<ProcessElements<?, ?, ?, ?>> {
 
     private SplittableParDoProcessElementsTranslator() {}
 
@@ -130,14 +134,6 @@ class TransformEvaluatorRegistry implements TransformEvaluatorFactory {
     public String getUrn(ProcessElements<?, ?, ?, ?> transform) {
       return SPLITTABLE_PROCESS_URN;
     }
-
-    @Override
-    public FunctionSpec translate(
-        AppliedPTransform<?, ?, ProcessElements<?, ?, ?, ?>> transform, SdkComponents components) {
-      throw new UnsupportedOperationException(
-          String.format("%s should never be translated",
-          ProcessElements.class.getCanonicalName()));
-    }
   }
 
   // the TransformEvaluatorFactories can construct instances of all generic types of transform,

http://git-wip-us.apache.org/repos/asf/beam/blob/10c63e15/runners/flink/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/pom.xml b/runners/flink/pom.xml
index e77dbc8..7840c32 100644
--- a/runners/flink/pom.xml
+++ b/runners/flink/pom.xml
@@ -219,11 +219,6 @@
     <!-- Beam -->
     <dependency>
       <groupId>org.apache.beam</groupId>
-      <artifactId>beam-model-pipeline</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.beam</groupId>
       <artifactId>beam-sdks-java-core</artifactId>
       <exclusions>
         <exclusion>

http://git-wip-us.apache.org/repos/asf/beam/blob/10c63e15/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
index d1e2d57..cec01f8 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
@@ -31,12 +31,10 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
-import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems;
 import org.apache.beam.runners.core.SystemReduceFn;
 import org.apache.beam.runners.core.construction.PTransformTranslation;
-import org.apache.beam.runners.core.construction.SdkComponents;
 import org.apache.beam.runners.core.construction.SplittableParDo;
 import org.apache.beam.runners.core.construction.TransformPayloadTranslatorRegistrar;
 import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows;
@@ -56,7 +54,6 @@ import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
@@ -1085,7 +1082,7 @@ class FlinkStreamingTransformTranslators {
    * once SDF is reorganized appropriately.
    */
   private static class SplittableParDoProcessElementsTranslator
-      implements PTransformTranslation.TransformPayloadTranslator<
+      extends PTransformTranslation.TransformPayloadTranslator.NotSerializable<
       SplittableParDoViaKeyedWorkItems.ProcessElements<?, ?, ?, ?>> {
 
     private SplittableParDoProcessElementsTranslator() {}
@@ -1094,17 +1091,6 @@ class FlinkStreamingTransformTranslators {
     public String getUrn(SplittableParDoViaKeyedWorkItems.ProcessElements<?, ?, ?, ?> transform) {
       return SPLITTABLE_PROCESS_URN;
     }
-
-    @Override
-    public RunnerApi.FunctionSpec translate(
-        AppliedPTransform<?, ?, SplittableParDoViaKeyedWorkItems.ProcessElements<?, ?, ?, ?>>
-            transform,
-        SdkComponents components) {
-      throw new UnsupportedOperationException(
-          String.format(
-              "%s should never be translated",
-              SplittableParDoViaKeyedWorkItems.ProcessElements.class.getCanonicalName()));
-    }
   }
 
   /** Registers classes specialized to the Flink runner. */
@@ -1128,6 +1114,11 @@ class FlinkStreamingTransformTranslators {
               new SplittableParDoGbkIntoKeyedWorkItemsPayloadTranslator())
           .build();
     }
+
+    @Override
+    public Map<String, PTransformTranslation.TransformPayloadTranslator> getTransformRehydrators() {
+      return Collections.emptyMap();
+    }
   }
 
   /**
@@ -1135,7 +1126,7 @@ class FlinkStreamingTransformTranslators {
    * once SDF is reorganized appropriately.
    */
   private static class SplittableParDoProcessElementsPayloadTranslator
-      implements PTransformTranslation.TransformPayloadTranslator<
+      extends PTransformTranslation.TransformPayloadTranslator.NotSerializable<
       SplittableParDoViaKeyedWorkItems.ProcessElements<?, ?, ?, ?>> {
 
     private SplittableParDoProcessElementsPayloadTranslator() {}
@@ -1144,17 +1135,6 @@ class FlinkStreamingTransformTranslators {
     public String getUrn(SplittableParDoViaKeyedWorkItems.ProcessElements<?, ?, ?, ?> transform) {
       return SplittableParDo.SPLITTABLE_PROCESS_URN;
     }
-
-    @Override
-    public RunnerApi.FunctionSpec translate(
-        AppliedPTransform<?, ?, SplittableParDoViaKeyedWorkItems.ProcessElements<?, ?, ?, ?>>
-            transform,
-        SdkComponents components) {
-      throw new UnsupportedOperationException(
-          String.format(
-              "%s should never be translated",
-              SplittableParDoViaKeyedWorkItems.ProcessElements.class.getCanonicalName()));
-    }
   }
 
   /**
@@ -1162,7 +1142,7 @@ class FlinkStreamingTransformTranslators {
    * once SDF is reorganized appropriately.
    */
   private static class SplittableParDoGbkIntoKeyedWorkItemsPayloadTranslator
-      implements PTransformTranslation.TransformPayloadTranslator<
+      extends PTransformTranslation.TransformPayloadTranslator.NotSerializable<
       SplittableParDoViaKeyedWorkItems.GBKIntoKeyedWorkItems<?, ?>> {
 
     private SplittableParDoGbkIntoKeyedWorkItemsPayloadTranslator() {}
@@ -1171,24 +1151,13 @@ class FlinkStreamingTransformTranslators {
     public String getUrn(SplittableParDoViaKeyedWorkItems.GBKIntoKeyedWorkItems<?, ?> transform) {
       return SplittableParDo.SPLITTABLE_GBKIKWI_URN;
     }
-
-    @Override
-    public RunnerApi.FunctionSpec translate(
-        AppliedPTransform<?, ?, SplittableParDoViaKeyedWorkItems.GBKIntoKeyedWorkItems<?, ?>>
-            transform,
-        SdkComponents components) {
-      throw new UnsupportedOperationException(
-          String.format(
-              "%s should never be translated",
-              SplittableParDoViaKeyedWorkItems.GBKIntoKeyedWorkItems.class.getCanonicalName()));
-    }
   }
 
   /**
    * A translator just to vend the URN.
    */
   private static class CreateStreamingFlinkViewPayloadTranslator
-      implements PTransformTranslation.TransformPayloadTranslator<
+      extends PTransformTranslation.TransformPayloadTranslator.NotSerializable<
           CreateStreamingFlinkView.CreateFlinkPCollectionView<?, ?>> {
 
     private CreateStreamingFlinkViewPayloadTranslator() {}
@@ -1197,16 +1166,5 @@ class FlinkStreamingTransformTranslators {
     public String getUrn(CreateStreamingFlinkView.CreateFlinkPCollectionView<?, ?> transform) {
       return CreateStreamingFlinkView.CREATE_STREAMING_FLINK_VIEW_URN;
     }
-
-    @Override
-    public RunnerApi.FunctionSpec translate(
-        AppliedPTransform<?, ?, CreateStreamingFlinkView.CreateFlinkPCollectionView<?, ?>>
-            transform,
-        SdkComponents components) {
-      throw new UnsupportedOperationException(
-          String.format(
-              "%s should never be translated",
-              CreateStreamingFlinkView.CreateFlinkPCollectionView.class.getCanonicalName()));
-    }
   }
 }