You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/10/17 21:08:09 UTC

[01/14] beam git commit: Clearer getOrDefault style in RehydratedComponents

Repository: beam
Updated Branches:
  refs/heads/master 0f7736dff -> de7cc05cc


Clearer getOrDefault style in RehydratedComponents


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/01103c2c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/01103c2c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/01103c2c

Branch: refs/heads/master
Commit: 01103c2cfdb8976fcf086a4d18f050a9fda41d1a
Parents: d684ca0
Author: Kenneth Knowles <ke...@apache.org>
Authored: Tue Oct 17 12:42:05 2017 -0700
Committer: Kenneth Knowles <ke...@apache.org>
Committed: Tue Oct 17 12:45:11 2017 -0700

----------------------------------------------------------------------
 .../runners/core/construction/RehydratedComponents.java     | 9 ++-------
 1 file changed, 2 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/01103c2c/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RehydratedComponents.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RehydratedComponents.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RehydratedComponents.java
index fdb6cea..09457a3 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RehydratedComponents.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RehydratedComponents.java
@@ -71,13 +71,8 @@ public class RehydratedComponents {
               new CacheLoader<String, Coder<?>>() {
                 @Override
                 public Coder<?> load(String id) throws Exception {
-                  RunnerApi.Coder coder;
-                  try {
-                    coder = components.getCodersOrThrow(id);
-                  } catch (IllegalArgumentException exc) {
-                    throw new IllegalStateException(
-                        String.format("No coder with id '%s' in serialized components", id), exc);
-                  }
+                  @Nullable RunnerApi.Coder coder = components.getCodersOrDefault(id, null);
+                  checkState(coder != null, "No coder with id '%s' in serialized components", id);
                   return CoderTranslation.fromProto(coder, RehydratedComponents.this);
                 }
               });


[12/14] beam git commit: Better error message for RehydratedComponents.getCoder

Posted by ke...@apache.org.
Better error message for RehydratedComponents.getCoder


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3d99d07a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3d99d07a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3d99d07a

Branch: refs/heads/master
Commit: 3d99d07a2b580fca99545c622f53b7378d1bb61a
Parents: 187beae
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Oct 3 15:51:03 2017 -0700
Committer: Kenneth Knowles <ke...@apache.org>
Committed: Tue Oct 17 12:45:11 2017 -0700

----------------------------------------------------------------------
 .../runners/core/construction/RehydratedComponents.java   | 10 ++++++++--
 1 file changed, 8 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/3d99d07a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RehydratedComponents.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RehydratedComponents.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RehydratedComponents.java
index 2e12603..fdb6cea 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RehydratedComponents.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RehydratedComponents.java
@@ -71,8 +71,14 @@ public class RehydratedComponents {
               new CacheLoader<String, Coder<?>>() {
                 @Override
                 public Coder<?> load(String id) throws Exception {
-                  return CoderTranslation.fromProto(
-                      components.getCodersOrThrow(id), RehydratedComponents.this);
+                  RunnerApi.Coder coder;
+                  try {
+                    coder = components.getCodersOrThrow(id);
+                  } catch (IllegalArgumentException exc) {
+                    throw new IllegalStateException(
+                        String.format("No coder with id '%s' in serialized components", id), exc);
+                  }
+                  return CoderTranslation.fromProto(coder, RehydratedComponents.this);
                 }
               });
 


[03/14] beam git commit: Reinstate proto round trip in Java DirectRunner

Posted by ke...@apache.org.
Reinstate proto round trip in Java DirectRunner


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c0cb28cc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c0cb28cc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c0cb28cc

Branch: refs/heads/master
Commit: c0cb28cc30733f561d4cc6155be5738584956ebd
Parents: 3d99d07
Author: Kenn Knowles <ke...@kennknowles.com>
Authored: Sat Sep 30 10:30:20 2017 -0700
Committer: Kenneth Knowles <ke...@apache.org>
Committed: Tue Oct 17 12:45:11 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/runners/direct/DirectOptions.java    |  8 --------
 .../org/apache/beam/runners/direct/DirectRunner.java | 15 ++++++---------
 2 files changed, 6 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/c0cb28cc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java
index af67306..574ab46 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java
@@ -17,8 +17,6 @@
  */
 package org.apache.beam.runners.direct;
 
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.options.ApplicationNameOptions;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.DefaultValueFactory;
@@ -76,10 +74,4 @@ public interface DirectOptions extends PipelineOptions, ApplicationNameOptions {
       return Math.max(Runtime.getRuntime().availableProcessors(), MIN_PARALLELISM);
     }
   }
-
-  @Experimental(Kind.CORE_RUNNERS_ONLY)
-  @Default.Boolean(false)
-  @Description("Control whether toProto/fromProto translations are applied to original Pipeline")
-  boolean isProtoTranslation();
-  void setProtoTranslation(boolean b);
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c0cb28cc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index 35d55b1..d041a5a 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -29,6 +29,7 @@ import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems;
 import org.apache.beam.runners.core.construction.PTransformMatchers;
 import org.apache.beam.runners.core.construction.PTransformTranslation;
@@ -160,15 +161,11 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> {
   @Override
   public DirectPipelineResult run(Pipeline originalPipeline) {
     Pipeline pipeline;
-    if (getPipelineOptions().isProtoTranslation()) {
-      try {
-        pipeline = PipelineTranslation.fromProto(
-            PipelineTranslation.toProto(originalPipeline));
-      } catch (IOException exception) {
-        throw new RuntimeException("Error preparing pipeline for direct execution.", exception);
-      }
-    } else {
-      pipeline = originalPipeline;
+    try {
+      RunnerApi.Pipeline protoPipeline = PipelineTranslation.toProto(originalPipeline);
+      pipeline = PipelineTranslation.fromProto(protoPipeline);
+    } catch (IOException exception) {
+      throw new RuntimeException("Error preparing pipeline for direct execution.", exception);
     }
     pipeline.replaceAll(defaultTransformOverrides());
     MetricsEnvironment.setMetricsSupported(true);


[14/14] beam git commit: This closes #3938: [BEAM-2674] Add custom rehydration; reinstate proto roundtrip for Java DirectRunner

Posted by ke...@apache.org.
This closes #3938: [BEAM-2674] Add custom rehydration; reinstate proto roundtrip for Java DirectRunner

  DirectRunner: Replace use of RawPTransform with NotSerializable.forUrn translators
  Clearer getOrDefault style in RehydratedComponents
  Add NotSerializable.forUrn to key by URN for non-serializable overrides
  Support side inputs in CombineTranslation
  Fix typo in UnboundedSource deserialization error message
  Reinstate proto round trip in Java DirectRunner
  Better error message for RehydratedComponents.getCoder
  Add custom rehydration for WriteFiles
  Add custom rehydration for ParDo
  Add custom rehydration for Combine
  Add custom rehydration for TestStream
  Add TransformPayloadTranslator.rehydrate to optionally specialize RawPTransform
  Add RawPTransform.migrate(SdkComponents) for re-serialization


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/de7cc05c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/de7cc05c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/de7cc05c

Branch: refs/heads/master
Commit: de7cc05cc67d1aa6331cddc17c2e02ed0efbe37d
Parents: 0f7736d 505021e
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Oct 17 13:48:57 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Oct 17 13:48:57 2017 -0700

----------------------------------------------------------------------
 .../core/construction/CombineTranslation.java   | 192 +++++++++++--
 .../CreatePCollectionViewTranslation.java       |   7 +-
 .../core/construction/FlattenTranslator.java    |   8 +-
 .../construction/GroupByKeyTranslation.java     |  13 +-
 .../construction/PTransformTranslation.java     | 261 +++++++++++++----
 .../core/construction/ParDoTranslation.java     | 282 ++++++++++++++-----
 .../core/construction/PipelineTranslation.java  | 110 +-------
 .../core/construction/ReadTranslation.java      |  43 ++-
 .../core/construction/RehydratedComponents.java |   5 +-
 .../core/construction/SplittableParDo.java      |   8 +
 .../construction/TestStreamTranslation.java     | 165 +++++++++--
 .../TransformPayloadTranslatorRegistrar.java    |   2 +
 .../construction/WindowIntoTranslation.java     |  15 +-
 .../construction/WriteFilesTranslation.java     | 172 +++++++++--
 .../construction/CombineTranslationTest.java    |  16 +-
 .../construction/TestStreamTranslationTest.java |   4 +-
 .../construction/WriteFilesTranslationTest.java |   3 +-
 .../core/SplittableParDoViaKeyedWorkItems.java  |   7 +
 .../beam/runners/direct/DirectGroupByKey.java   |  16 +-
 .../beam/runners/direct/DirectOptions.java      |   8 -
 .../beam/runners/direct/DirectRunner.java       |  15 +-
 .../beam/runners/direct/MultiStepCombine.java   |  18 +-
 .../direct/ParDoMultiOverrideFactory.java       |   9 +-
 .../direct/TestStreamEvaluatorFactory.java      |   9 +-
 .../direct/TransformEvaluatorRegistry.java      |  32 +--
 .../runners/direct/ViewOverrideFactory.java     |   8 +-
 runners/flink/pom.xml                           |   5 -
 .../FlinkStreamingTransformTranslators.java     |  60 +---
 28 files changed, 1017 insertions(+), 476 deletions(-)
----------------------------------------------------------------------



[08/14] beam git commit: Add custom rehydration for ParDo

Posted by ke...@apache.org.
Add custom rehydration for ParDo


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7fb3e793
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7fb3e793
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7fb3e793

Branch: refs/heads/master
Commit: 7fb3e79328e1a9ef8340170aecd44c89e596eec5
Parents: 92209c3
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Oct 3 19:17:48 2017 -0700
Committer: Kenneth Knowles <ke...@apache.org>
Committed: Tue Oct 17 12:45:11 2017 -0700

----------------------------------------------------------------------
 .../core/construction/ParDoTranslation.java     | 226 ++++++++++++++++---
 .../core/construction/PipelineTranslation.java  |  22 --
 2 files changed, 194 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/7fb3e793/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
index 5092448..f88cbe5 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
@@ -26,6 +26,7 @@ import static org.apache.beam.runners.core.construction.PTransformTranslation.PA
 import com.google.auto.service.AutoService;
 import com.google.auto.value.AutoValue;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.MoreObjects;
 import com.google.common.base.Optional;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
@@ -35,6 +36,7 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -75,6 +77,7 @@ import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
 import org.apache.beam.sdk.values.WindowingStrategy;
@@ -91,8 +94,7 @@ public class ParDoTranslation {
 
   /** A {@link TransformPayloadTranslator} for {@link ParDo}. */
   public static class ParDoPayloadTranslator
-      extends PTransformTranslation.TransformPayloadTranslator.WithDefaultRehydration<
-          ParDo.MultiOutput<?, ?>> {
+      implements TransformPayloadTranslator<MultiOutput<?, ?>> {
     public static TransformPayloadTranslator create() {
       return new ParDoPayloadTranslator();
     }
@@ -115,6 +117,13 @@ public class ParDoTranslation {
           .build();
     }
 
+    @Override
+    public PTransformTranslation.RawPTransform<?, ?> rehydrate(
+        RunnerApi.PTransform protoTransform, RehydratedComponents rehydratedComponents)
+        throws IOException {
+      return new RawParDo<>(protoTransform, rehydratedComponents);
+    }
+
     /** Registers {@link ParDoPayloadTranslator}. */
     @AutoService(TransformPayloadTranslatorRegistrar.class)
     public static class Registrar implements TransformPayloadTranslatorRegistrar {
@@ -125,41 +134,76 @@ public class ParDoTranslation {
       }
 
       @Override
-      public Map<String, TransformPayloadTranslator> getTransformRehydrators() {
-        return Collections.emptyMap();
+      public Map<String, ? extends TransformPayloadTranslator> getTransformRehydrators() {
+        return Collections.singletonMap(PAR_DO_TRANSFORM_URN, new ParDoPayloadTranslator());
       }
     }
   }
 
-  public static ParDoPayload toProto(ParDo.MultiOutput<?, ?> parDo, SdkComponents components)
+  public static ParDoPayload toProto(final ParDo.MultiOutput<?, ?> parDo, SdkComponents components)
       throws IOException {
-    DoFn<?, ?> doFn = parDo.getFn();
-    DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
-    Map<String, StateDeclaration> states = signature.stateDeclarations();
-    Map<String, TimerDeclaration> timers = signature.timerDeclarations();
-    List<Parameter> parameters = signature.processElement().extraParameters();
-
-    ParDoPayload.Builder builder = ParDoPayload.newBuilder();
-    builder.setDoFn(toProto(parDo.getFn(), parDo.getMainOutputTag()));
-    builder.setSplittable(signature.processElement().isSplittable());
-    for (PCollectionView<?> sideInput : parDo.getSideInputs()) {
-      builder.putSideInputs(sideInput.getTagInternal().getId(), toProto(sideInput));
-    }
-    for (Parameter parameter : parameters) {
-      Optional<RunnerApi.Parameter> protoParameter = toProto(parameter);
-      if (protoParameter.isPresent()) {
-        builder.addParameters(protoParameter.get());
-      }
-    }
-    for (Map.Entry<String, StateDeclaration> state : states.entrySet()) {
-      RunnerApi.StateSpec spec = toProto(getStateSpecOrCrash(state.getValue(), doFn), components);
-      builder.putStateSpecs(state.getKey(), spec);
-    }
-    for (Map.Entry<String, TimerDeclaration> timer : timers.entrySet()) {
-      RunnerApi.TimerSpec spec = toProto(getTimerSpecOrCrash(timer.getValue(), doFn));
-      builder.putTimerSpecs(timer.getKey(), spec);
-    }
-    return builder.build();
+
+    final DoFn<?, ?> doFn = parDo.getFn();
+    final DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
+
+    return payloadForParDoLike(
+        new ParDoLike() {
+          @Override
+          public SdkFunctionSpec translateDoFn(SdkComponents newComponents) {
+            return toProto(parDo.getFn(), parDo.getMainOutputTag());
+          }
+
+          @Override
+          public List<RunnerApi.Parameter> translateParameters() {
+            List<RunnerApi.Parameter> parameters = new ArrayList<>();
+            for (Parameter parameter : signature.processElement().extraParameters()) {
+              Optional<RunnerApi.Parameter> protoParameter = toProto(parameter);
+              if (protoParameter.isPresent()) {
+                parameters.add(protoParameter.get());
+              }
+            }
+            return parameters;
+          }
+
+          @Override
+          public Map<String, SideInput> translateSideInputs(SdkComponents components) {
+            Map<String, SideInput> sideInputs = new HashMap<>();
+            for (PCollectionView<?> sideInput : parDo.getSideInputs()) {
+              sideInputs.put(sideInput.getTagInternal().getId(), toProto(sideInput));
+            }
+            return sideInputs;
+          }
+
+          @Override
+          public Map<String, RunnerApi.StateSpec> translateStateSpecs(SdkComponents components)
+              throws IOException {
+            Map<String, RunnerApi.StateSpec> stateSpecs = new HashMap<>();
+            for (Map.Entry<String, StateDeclaration> state :
+                signature.stateDeclarations().entrySet()) {
+              RunnerApi.StateSpec spec =
+                  toProto(getStateSpecOrCrash(state.getValue(), doFn), components);
+              stateSpecs.put(state.getKey(), spec);
+            }
+            return stateSpecs;
+          }
+
+          @Override
+          public Map<String, RunnerApi.TimerSpec> translateTimerSpecs(SdkComponents newComponents) {
+            Map<String, RunnerApi.TimerSpec> timerSpecs = new HashMap<>();
+            for (Map.Entry<String, TimerDeclaration> timer :
+                signature.timerDeclarations().entrySet()) {
+              RunnerApi.TimerSpec spec = toProto(getTimerSpecOrCrash(timer.getValue(), doFn));
+              timerSpecs.put(timer.getKey(), spec);
+            }
+            return timerSpecs;
+          }
+
+          @Override
+          public boolean isSplittable() {
+            return signature.processElement().isSplittable();
+          }
+        },
+        components);
   }
 
   private static StateSpec<?> getStateSpecOrCrash(
@@ -603,4 +647,122 @@ public class ParDoTranslation {
         SerializableUtils.deserializeFromByteArray(
             spec.getPayload().toByteArray(), "Custom WinodwMappingFn");
   }
+
+  static class RawParDo<InputT, OutputT>
+      extends PTransformTranslation.RawPTransform<PCollection<InputT>, PCollection<OutputT>>
+      implements ParDoLike {
+
+    private final RunnerApi.PTransform protoTransform;
+    private final transient RehydratedComponents rehydratedComponents;
+
+    // Parsed from protoTransform and cached
+    private final FunctionSpec spec;
+    private final ParDoPayload payload;
+
+    public RawParDo(RunnerApi.PTransform protoTransform, RehydratedComponents rehydratedComponents)
+        throws IOException {
+      this.rehydratedComponents = rehydratedComponents;
+      this.protoTransform = protoTransform;
+      this.spec = protoTransform.getSpec();
+      this.payload = ParDoPayload.parseFrom(spec.getPayload());
+    }
+
+    @Override
+    public FunctionSpec getSpec() {
+      return spec;
+    }
+
+    @Override
+    public FunctionSpec migrate(SdkComponents components) throws IOException {
+      return FunctionSpec.newBuilder()
+          .setUrn(PAR_DO_TRANSFORM_URN)
+          .setPayload(payloadForParDoLike(this, components).toByteString())
+          .build();
+    }
+
+    @Override
+    public Map<TupleTag<?>, PValue> getAdditionalInputs() {
+      Map<TupleTag<?>, PValue> additionalInputs = new HashMap<>();
+      for (Map.Entry<String, SideInput> sideInputEntry : payload.getSideInputsMap().entrySet()) {
+        try {
+          additionalInputs.put(
+              new TupleTag<>(sideInputEntry.getKey()),
+              rehydratedComponents.getPCollection(
+                  protoTransform.getInputsOrThrow(sideInputEntry.getKey())));
+        } catch (IOException exc) {
+          throw new IllegalStateException(
+              String.format(
+                  "Could not find input with name %s for %s transform",
+                  sideInputEntry.getKey(), ParDo.class.getSimpleName()));
+        }
+      }
+      return additionalInputs;
+    }
+
+    @Override
+    public SdkFunctionSpec translateDoFn(SdkComponents newComponents) {
+      // TODO: re-register the environment with the new components
+      return payload.getDoFn();
+    }
+
+    @Override
+    public List<RunnerApi.Parameter> translateParameters() {
+      return MoreObjects.firstNonNull(
+          payload.getParametersList(), Collections.<RunnerApi.Parameter>emptyList());
+    }
+
+    @Override
+    public Map<String, SideInput> translateSideInputs(SdkComponents components) {
+      // TODO: re-register the PCollections and UDF environments
+      return MoreObjects.firstNonNull(
+          payload.getSideInputsMap(), Collections.<String, SideInput>emptyMap());
+    }
+
+    @Override
+    public Map<String, RunnerApi.StateSpec> translateStateSpecs(SdkComponents components) {
+      // TODO: re-register the coders
+      return MoreObjects.firstNonNull(
+          payload.getStateSpecsMap(), Collections.<String, RunnerApi.StateSpec>emptyMap());
+    }
+
+    @Override
+    public Map<String, RunnerApi.TimerSpec> translateTimerSpecs(SdkComponents newComponents) {
+      return MoreObjects.firstNonNull(
+          payload.getTimerSpecsMap(), Collections.<String, RunnerApi.TimerSpec>emptyMap());
+    }
+
+    @Override
+    public boolean isSplittable() {
+      return payload.getSplittable();
+    }
+  }
+
+  /** These methods drive to-proto translation from Java and from rehydrated ParDos. */
+  private interface ParDoLike {
+    SdkFunctionSpec translateDoFn(SdkComponents newComponents);
+
+    List<RunnerApi.Parameter> translateParameters();
+
+    Map<String, RunnerApi.SideInput> translateSideInputs(SdkComponents components);
+
+    Map<String, RunnerApi.StateSpec> translateStateSpecs(SdkComponents components)
+        throws IOException;
+
+    Map<String, RunnerApi.TimerSpec> translateTimerSpecs(SdkComponents newComponents);
+
+    boolean isSplittable();
+  }
+
+  public static ParDoPayload payloadForParDoLike(ParDoLike parDo, SdkComponents components)
+      throws IOException {
+
+    return ParDoPayload.newBuilder()
+        .setDoFn(parDo.translateDoFn(components))
+        .addAllParameters(parDo.translateParameters())
+        .putAllStateSpecs(parDo.translateStateSpecs(components))
+        .putAllTimerSpecs(parDo.translateTimerSpecs(components))
+        .putAllSideInputs(parDo.translateSideInputs(components))
+        .setSplittable(parDo.isSplittable())
+        .build();
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7fb3e793/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
index 85033e5..c8d38eb 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
@@ -141,31 +141,9 @@ public class PipelineTranslation {
           rehydratedComponents.getPCollection(outputEntry.getValue()));
     }
 
-    RunnerApi.FunctionSpec transformSpec = transformProto.getSpec();
     RawPTransform<?, ?> transform =
         PTransformTranslation.rehydrate(transformProto, rehydratedComponents);
 
-    // By default, no "additional" inputs, since that is an SDK-specific thing.
-    // Only ParDo and WriteFiles really separate main from side inputs
-    Map<TupleTag<?>, PValue> additionalInputs = Collections.emptyMap();
-
-    // TODO: ParDoTranslation should own it - https://issues.apache.org/jira/browse/BEAM-2674
-    if (transformSpec.getUrn().equals(PTransformTranslation.PAR_DO_TRANSFORM_URN)) {
-      RunnerApi.ParDoPayload payload = RunnerApi.ParDoPayload.parseFrom(transformSpec.getPayload());
-      additionalInputs =
-          sideInputMapToAdditionalInputs(
-              transformProto, rehydratedComponents, rehydratedInputs, payload.getSideInputsMap());
-    }
-
-    // TODO: WriteFilesTranslation should own it - https://issues.apache.org/jira/browse/BEAM-2674
-    if (transformSpec.getUrn().equals(PTransformTranslation.WRITE_FILES_TRANSFORM_URN)) {
-      RunnerApi.WriteFilesPayload payload =
-          RunnerApi.WriteFilesPayload.parseFrom(transformSpec.getPayload());
-      additionalInputs =
-          sideInputMapToAdditionalInputs(
-              transformProto, rehydratedComponents, rehydratedInputs, payload.getSideInputsMap());
-    }
-
     if (isPrimitive(transformProto)) {
       transforms.addFinalizedPrimitiveNode(
           transformProto.getUniqueName(), rehydratedInputs, transform, rehydratedOutputs);


[09/14] beam git commit: Add custom rehydration for WriteFiles

Posted by ke...@apache.org.
Add custom rehydration for WriteFiles


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/187beae4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/187beae4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/187beae4

Branch: refs/heads/master
Commit: 187beae4d20576d0e0ea1ca80d03252d1f2507e5
Parents: 7fb3e79
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Oct 3 21:17:38 2017 -0700
Committer: Kenneth Knowles <ke...@apache.org>
Committed: Tue Oct 17 12:45:11 2017 -0700

----------------------------------------------------------------------
 .../construction/WriteFilesTranslation.java     | 166 ++++++++++++++++---
 .../construction/WriteFilesTranslationTest.java |   3 +-
 2 files changed, 148 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/187beae4/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
index 645b562..d0b2182 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
@@ -24,12 +24,13 @@ import static org.apache.beam.runners.core.construction.PTransformTranslation.WR
 
 import com.google.auto.service.AutoService;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.MoreObjects;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import com.google.protobuf.ByteString;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
@@ -46,6 +47,9 @@ import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TupleTag;
 
 /**
@@ -59,18 +63,37 @@ public class WriteFilesTranslation {
       "urn:beam:file_based_sink:javasdk:0.1";
 
   @VisibleForTesting
-  static WriteFilesPayload toProto(WriteFiles<?, ?, ?> transform) {
-    Map<String, SideInput> sideInputs = Maps.newHashMap();
-    for (PCollectionView<?> view : transform.getSink().getDynamicDestinations().getSideInputs()) {
-      sideInputs.put(view.getTagInternal().getId(), ParDoTranslation.toProto(view));
-    }
-    return WriteFilesPayload.newBuilder()
-        .setSink(toProto(transform.getSink()))
-        .setWindowedWrites(transform.isWindowedWrites())
-        .setRunnerDeterminedSharding(
-            transform.getNumShards() == null && transform.getSharding() == null)
-        .putAllSideInputs(sideInputs)
-        .build();
+  static WriteFilesPayload payloadForWriteFiles(
+      final WriteFiles<?, ?, ?> transform, SdkComponents components) throws IOException {
+    return payloadForWriteFilesLike(
+        new WriteFilesLike() {
+          @Override
+          public SdkFunctionSpec translateSink(SdkComponents newComponents) {
+            // TODO: register the environment
+            return toProto(transform.getSink());
+          }
+
+          @Override
+          public Map<String, SideInput> translateSideInputs(SdkComponents components) {
+            Map<String, SideInput> sideInputs = new HashMap<>();
+            for (PCollectionView<?> view :
+                transform.getSink().getDynamicDestinations().getSideInputs()) {
+              sideInputs.put(view.getTagInternal().getId(), ParDoTranslation.toProto(view));
+            }
+            return sideInputs;
+          }
+
+          @Override
+          public boolean isWindowedWrites() {
+            return transform.isWindowedWrites();
+          }
+
+          @Override
+          public boolean isRunnerDeterminedSharding() {
+            return transform.getNumShards() == null && transform.getSharding() == null;
+          }
+        },
+        components);
   }
 
   private static SdkFunctionSpec toProto(FileBasedSink<?, ?, ?> sink) {
@@ -174,8 +197,82 @@ public class WriteFilesTranslation {
             .getPayload());
   }
 
-  static class WriteFilesTranslator
-      extends TransformPayloadTranslator.WithDefaultRehydration<WriteFiles<?, ?, ?>> {
+  static class RawWriteFiles extends PTransformTranslation.RawPTransform<PInput, POutput>
+      implements WriteFilesLike {
+
+    private final RunnerApi.PTransform protoTransform;
+    private final transient RehydratedComponents rehydratedComponents;
+
+    // Parsed from protoTransform and cached
+    private final FunctionSpec spec;
+    private final RunnerApi.WriteFilesPayload payload;
+
+    public RawWriteFiles(
+        RunnerApi.PTransform protoTransform, RehydratedComponents rehydratedComponents)
+        throws IOException {
+      this.rehydratedComponents = rehydratedComponents;
+      this.protoTransform = protoTransform;
+      this.spec = protoTransform.getSpec();
+      this.payload = RunnerApi.WriteFilesPayload.parseFrom(spec.getPayload());
+    }
+
+    @Override
+    public FunctionSpec getSpec() {
+      return spec;
+    }
+
+    @Override
+    public FunctionSpec migrate(SdkComponents components) throws IOException {
+      return FunctionSpec.newBuilder()
+          .setUrn(WRITE_FILES_TRANSFORM_URN)
+          .setPayload(payloadForWriteFilesLike(this, components).toByteString())
+          .build();
+    }
+
+    @Override
+    public Map<TupleTag<?>, PValue> getAdditionalInputs() {
+      Map<TupleTag<?>, PValue> additionalInputs = new HashMap<>();
+      for (Map.Entry<String, SideInput> sideInputEntry : payload.getSideInputsMap().entrySet()) {
+        try {
+          additionalInputs.put(
+              new TupleTag<>(sideInputEntry.getKey()),
+              rehydratedComponents.getPCollection(
+                  protoTransform.getInputsOrThrow(sideInputEntry.getKey())));
+        } catch (IOException exc) {
+          throw new IllegalStateException(
+              String.format(
+                  "Could not find input with name %s for %s transform",
+                  sideInputEntry.getKey(), WriteFiles.class.getSimpleName()));
+        }
+      }
+      return additionalInputs;
+    }
+
+    @Override
+    public SdkFunctionSpec translateSink(SdkComponents newComponents) {
+      // TODO: re-register the environment with the new components
+      return payload.getSink();
+    }
+
+    @Override
+    public Map<String, SideInput> translateSideInputs(SdkComponents components) {
+      // TODO: re-register the PCollections and UDF environments
+      return MoreObjects.firstNonNull(
+          payload.getSideInputsMap(), Collections.<String, SideInput>emptyMap());
+    }
+
+    @Override
+    public boolean isWindowedWrites() {
+      return payload.getWindowedWrites();
+    }
+
+    @Override
+    public boolean isRunnerDeterminedSharding() {
+      return payload.getRunnerDeterminedSharding();
+    }
+  }
+
+  static class WriteFilesTranslator implements TransformPayloadTranslator<WriteFiles<?, ?, ?>> {
     @Override
     public String getUrn(WriteFiles<?, ?, ?> transform) {
       return WRITE_FILES_TRANSFORM_URN;
@@ -183,14 +280,21 @@ public class WriteFilesTranslation {
 
     @Override
     public FunctionSpec translate(
-        AppliedPTransform<?, ?, WriteFiles<?, ?, ?>> transform, SdkComponents components) {
+        AppliedPTransform<?, ?, WriteFiles<?, ?, ?>> transform, SdkComponents components)
+        throws IOException {
       return FunctionSpec.newBuilder()
           .setUrn(getUrn(transform.getTransform()))
-          .setPayload(toProto(transform.getTransform()).toByteString())
+          .setPayload(payloadForWriteFiles(transform.getTransform(), components).toByteString())
           .build();
     }
-  }
 
+    @Override
+    public PTransformTranslation.RawPTransform<?, ?> rehydrate(
+        RunnerApi.PTransform protoTransform, RehydratedComponents rehydratedComponents)
+        throws IOException {
+      return new RawWriteFiles(protoTransform, rehydratedComponents);
+    }
+  }
   /** Registers {@link WriteFilesTranslator}. */
   @AutoService(TransformPayloadTranslatorRegistrar.class)
   public static class Registrar implements TransformPayloadTranslatorRegistrar {
@@ -202,8 +306,30 @@ public class WriteFilesTranslation {
     }
 
     @Override
-    public Map<String, TransformPayloadTranslator> getTransformRehydrators() {
-      return Collections.emptyMap();
+    public Map<String, ? extends TransformPayloadTranslator> getTransformRehydrators() {
+      return Collections.singletonMap(WRITE_FILES_TRANSFORM_URN, new WriteFilesTranslator());
     }
   }
+
+  /** These methods drive to-proto translation from Java and from rehydrated WriteFiles. */
+  private interface WriteFilesLike {
+    SdkFunctionSpec translateSink(SdkComponents newComponents);
+
+    Map<String, RunnerApi.SideInput> translateSideInputs(SdkComponents components);
+
+    boolean isWindowedWrites();
+
+    boolean isRunnerDeterminedSharding();
+  }
+
+  public static WriteFilesPayload payloadForWriteFilesLike(
+      WriteFilesLike writeFiles, SdkComponents components) throws IOException {
+
+    return WriteFilesPayload.newBuilder()
+        .setSink(writeFiles.translateSink(components))
+        .putAllSideInputs(writeFiles.translateSideInputs(components))
+        .setWindowedWrites(writeFiles.isWindowedWrites())
+        .setRunnerDeterminedSharding(writeFiles.isRunnerDeterminedSharding())
+        .build();
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/187beae4/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java
index c874828..4bc61d4 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java
@@ -76,7 +76,8 @@ public class WriteFilesTranslationTest {
 
     @Test
     public void testEncodedProto() throws Exception {
-      RunnerApi.WriteFilesPayload payload = WriteFilesTranslation.toProto(writeFiles);
+      RunnerApi.WriteFilesPayload payload =
+          WriteFilesTranslation.payloadForWriteFiles(writeFiles, SdkComponents.create());
 
       assertThat(
           payload.getRunnerDeterminedSharding(),


[02/14] beam git commit: Fix typo in UnboundedSource deserialization error message

Posted by ke...@apache.org.
Fix typo in UnboundedSource deserialization error message


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/11368e08
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/11368e08
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/11368e08

Branch: refs/heads/master
Commit: 11368e08b2afdb7835eeaef7d868f819f44e631e
Parents: c0cb28c
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Oct 9 20:00:46 2017 -0700
Committer: Kenneth Knowles <ke...@apache.org>
Committed: Tue Oct 17 12:45:11 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/runners/core/construction/ReadTranslation.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/11368e08/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java
index 4b14c51..ee89562 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java
@@ -134,7 +134,7 @@ public class ReadTranslation {
     checkArgument(payload.getIsBounded().equals(IsBounded.Enum.UNBOUNDED));
     return (UnboundedSource<?, ?>)
         SerializableUtils.deserializeFromByteArray(
-            payload.getSource().getSpec().getPayload().toByteArray(), "BoundedSource");
+            payload.getSource().getSpec().getPayload().toByteArray(), "UnboundedSource");
   }
 
   public static PCollection.IsBounded sourceIsBounded(AppliedPTransform<?, ?, ?> transform) {


[07/14] beam git commit: Add custom rehydration for Combine

Posted by ke...@apache.org.
Add custom rehydration for Combine


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/92209c32
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/92209c32
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/92209c32

Branch: refs/heads/master
Commit: 92209c323eb54e8a57b496eb2035da44fec00714
Parents: 6abf6f5
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Oct 3 11:40:54 2017 -0700
Committer: Kenneth Knowles <ke...@apache.org>
Committed: Tue Oct 17 12:45:11 2017 -0700

----------------------------------------------------------------------
 .../core/construction/CombineTranslation.java   | 165 ++++++++++++++++++-
 .../construction/CombineTranslationTest.java    |  16 +-
 2 files changed, 161 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/92209c32/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java
index 69591ee..21796aa 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java
@@ -22,12 +22,15 @@ import static com.google.common.base.Preconditions.checkArgument;
 import static org.apache.beam.runners.core.construction.PTransformTranslation.COMBINE_TRANSFORM_URN;
 
 import com.google.auto.service.AutoService;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import com.google.protobuf.ByteString;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import javax.annotation.Nonnull;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.model.pipeline.v1.RunnerApi.CombinePayload;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
@@ -52,12 +55,12 @@ import org.apache.beam.sdk.values.PCollection;
  * RunnerApi.CombinePayload} protos.
  */
 public class CombineTranslation {
+
   public static final String JAVA_SERIALIZED_COMBINE_FN_URN = "urn:beam:combinefn:javasdk:v1";
 
   /** A {@link TransformPayloadTranslator} for {@link Combine.PerKey}. */
   public static class CombinePayloadTranslator
-      extends PTransformTranslation.TransformPayloadTranslator.WithDefaultRehydration<
-          Combine.PerKey<?, ?, ?>> {
+      implements PTransformTranslation.TransformPayloadTranslator<Combine.PerKey<?, ?, ?>> {
     public static TransformPayloadTranslator create() {
       return new CombinePayloadTranslator();
     }
@@ -73,13 +76,25 @@ public class CombineTranslation {
     public FunctionSpec translate(
         AppliedPTransform<?, ?, Combine.PerKey<?, ?, ?>> transform, SdkComponents components)
         throws IOException {
-      CombinePayload payload = toProto(transform, components);
-      return RunnerApi.FunctionSpec.newBuilder()
+      return FunctionSpec.newBuilder()
           .setUrn(COMBINE_TRANSFORM_URN)
-          .setPayload(payload.toByteString())
+          .setPayload(payloadForCombine((AppliedPTransform) transform, components).toByteString())
           .build();
     }
 
+    @Override
+    public PTransformTranslation.RawPTransform<?, ?> rehydrate(
+        RunnerApi.PTransform protoTransform, RehydratedComponents rehydratedComponents)
+        throws IOException {
+      checkArgument(
+          protoTransform.getSpec() != null,
+          "%s received transform with null spec",
+          getClass().getSimpleName());
+      checkArgument(protoTransform.getSpec().getUrn().equals(COMBINE_TRANSFORM_URN));
+      return new RawCombine<>(
+          CombinePayload.parseFrom(protoTransform.getSpec().getPayload()), rehydratedComponents);
+    }
+
     /** Registers {@link CombinePayloadTranslator}. */
     @AutoService(TransformPayloadTranslatorRegistrar.class)
     public static class Registrar implements TransformPayloadTranslatorRegistrar {
@@ -90,13 +105,147 @@ public class CombineTranslation {
       }
 
       @Override
-      public Map<String, TransformPayloadTranslator> getTransformRehydrators() {
-        return Collections.emptyMap();
+      public Map<String, ? extends TransformPayloadTranslator> getTransformRehydrators() {
+        return Collections.singletonMap(COMBINE_TRANSFORM_URN, new CombinePayloadTranslator());
+      }
+    }
+  }
+
+  /**
+   * These methods drive to-proto translation for both Java SDK transforms and rehydrated
+   * transforms.
+   */
+  interface CombineLike {
+    RunnerApi.SdkFunctionSpec getCombineFn();
+
+    Coder<?> getAccumulatorCoder();
+
+    Map<String, RunnerApi.SideInput> getSideInputs();
+  }
+
+  /** Produces a {@link RunnerApi.CombinePayload} from a portable {@link CombineLike}. */
+  static RunnerApi.CombinePayload payloadForCombineLike(
+      CombineLike combine, SdkComponents components) throws IOException {
+    return RunnerApi.CombinePayload.newBuilder()
+        .setAccumulatorCoderId(components.registerCoder(combine.getAccumulatorCoder()))
+        .putAllSideInputs(combine.getSideInputs())
+        .setCombineFn(combine.getCombineFn())
+        .build();
+  }
+
+  static <K, InputT, OutputT> CombinePayload payloadForCombine(
+      final AppliedPTransform<
+              PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>,
+              Combine.PerKey<K, InputT, OutputT>>
+          combine,
+      SdkComponents components)
+      throws IOException {
+
+    return payloadForCombineLike(
+        new CombineLike() {
+          @Override
+          public SdkFunctionSpec getCombineFn() {
+            return SdkFunctionSpec.newBuilder()
+                // TODO: Set Java SDK Environment
+                .setSpec(
+                    FunctionSpec.newBuilder()
+                        .setUrn(JAVA_SERIALIZED_COMBINE_FN_URN)
+                        .setPayload(
+                            ByteString.copyFrom(
+                                SerializableUtils.serializeToByteArray(
+                                    combine.getTransform().getFn())))
+                        .build())
+                .build();
+          }
+
+          @Override
+          public Coder<?> getAccumulatorCoder() {
+            GlobalCombineFn<?, ?, ?> combineFn = combine.getTransform().getFn();
+            try {
+              return extractAccumulatorCoder(combineFn, (AppliedPTransform) combine);
+            } catch (CannotProvideCoderException e) {
+              throw new IllegalStateException(e);
+            }
+          }
+
+          @Override
+          public Map<String, SideInput> getSideInputs() {
+            // TODO: support side inputs
+            return ImmutableMap.of();
+          }
+        },
+        components);
+  }
+
+  private static class RawCombine<K, InputT, AccumT, OutputT>
+      extends PTransformTranslation.RawPTransform<
+          PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>>
+      implements CombineLike {
+
+    private final transient RehydratedComponents rehydratedComponents;
+    private final FunctionSpec spec;
+    private final CombinePayload payload;
+    private final Coder<AccumT> accumulatorCoder;
+
+    private RawCombine(CombinePayload payload, RehydratedComponents rehydratedComponents) {
+      this.rehydratedComponents = rehydratedComponents;
+      this.payload = payload;
+      this.spec =
+          FunctionSpec.newBuilder()
+              .setUrn(COMBINE_TRANSFORM_URN)
+              .setPayload(payload.toByteString())
+              .build();
+
+      // Eagerly extract the coder to throw a good exception here
+      try {
+        this.accumulatorCoder =
+            (Coder<AccumT>) rehydratedComponents.getCoder(payload.getAccumulatorCoderId());
+      } catch (IOException exc) {
+        throw new IllegalArgumentException(
+            String.format(
+                "Failure extracting accumulator coder with id '%s' for %s",
+                payload.getAccumulatorCoderId(), Combine.class.getSimpleName()),
+            exc);
       }
     }
+
+    @Override
+    public String getUrn() {
+      return COMBINE_TRANSFORM_URN;
+    }
+
+    @Nonnull
+    @Override
+    public FunctionSpec getSpec() {
+      return spec;
+    }
+
+    @Override
+    public RunnerApi.FunctionSpec migrate(SdkComponents sdkComponents) throws IOException {
+      return RunnerApi.FunctionSpec.newBuilder()
+          .setUrn(COMBINE_TRANSFORM_URN)
+          .setPayload(payloadForCombineLike(this, sdkComponents).toByteString())
+          .build();
+    }
+
+    @Override
+    public SdkFunctionSpec getCombineFn() {
+      return payload.getCombineFn();
+    }
+
+    @Override
+    public Coder<?> getAccumulatorCoder() {
+      return accumulatorCoder;
+    }
+
+    @Override
+    public Map<String, SideInput> getSideInputs() {
+      return payload.getSideInputsMap();
+    }
   }
 
-  public static CombinePayload toProto(
+  @VisibleForTesting
+  static CombinePayload toProto(
       AppliedPTransform<?, ?, Combine.PerKey<?, ?, ?>> combine, SdkComponents sdkComponents)
       throws IOException {
     GlobalCombineFn<?, ?, ?> combineFn = combine.getTransform().getFn();

http://git-wip-us.apache.org/repos/asf/beam/blob/92209c32/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CombineTranslationTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CombineTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CombineTranslationTest.java
index 8740d7f..af162d3 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CombineTranslationTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CombineTranslationTest.java
@@ -52,15 +52,11 @@ import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameter;
 import org.junit.runners.Parameterized.Parameters;
 
-/**
- * Tests for {@link CombineTranslation}.
- */
+/** Tests for {@link CombineTranslation}. */
 @RunWith(Enclosed.class)
 public class CombineTranslationTest {
 
-  /**
-   * Tests that simple {@link CombineFn CombineFns} can be translated to and from proto.
-   */
+  /** Tests that simple {@link CombineFn CombineFns} can be translated to and from proto. */
   @RunWith(Parameterized.class)
   public static class TranslateSimpleCombinesTest {
     @Parameters(name = "{index}: {0}")
@@ -111,14 +107,10 @@ public class CombineTranslationTest {
     }
   }
 
-
-  /**
-   * Tests that a {@link CombineFnWithContext} can be translated.
-   */
+  /** Tests that a {@link CombineFnWithContext} can be translated. */
   @RunWith(JUnit4.class)
   public static class ValidateCombineWithContextTest {
-    @Rule
-    public TestPipeline pipeline = TestPipeline.create();
+    @Rule public TestPipeline pipeline = TestPipeline.create();
 
     @Test
     public void testToFromProtoWithSideInputs() throws Exception {


[13/14] beam git commit: DirectRunner: Replace use of RawPTransform with NotSerializable.forUrn translators

Posted by ke...@apache.org.
DirectRunner: Replace use of RawPTransform with NotSerializable.forUrn translators


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/505021e6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/505021e6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/505021e6

Branch: refs/heads/master
Commit: 505021e6a253b882bb870694ff7540418e809e51
Parents: 01103c2
Author: Kenneth Knowles <ke...@apache.org>
Authored: Tue Oct 17 12:43:19 2017 -0700
Committer: Kenneth Knowles <ke...@apache.org>
Committed: Tue Oct 17 13:48:28 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/direct/DirectGroupByKey.java   | 30 ++------------------
 .../direct/ParDoMultiOverrideFactory.java       | 16 +----------
 .../direct/TestStreamEvaluatorFactory.java      | 16 +----------
 .../direct/TransformEvaluatorRegistry.java      | 14 +++++----
 .../runners/direct/ViewOverrideFactory.java     | 16 +----------
 5 files changed, 13 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/505021e6/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
index 9e56b65..0053360 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
@@ -20,12 +20,9 @@ package org.apache.beam.runners.direct;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
-import javax.annotation.Nullable;
-import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.KeyedWorkItemCoder;
 import org.apache.beam.runners.core.construction.ForwardingPTransform;
-import org.apache.beam.runners.core.construction.PTransformTranslation;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
@@ -74,8 +71,7 @@ class DirectGroupByKey<K, V>
   }
 
   static final class DirectGroupByKeyOnly<K, V>
-      extends PTransformTranslation.RawPTransform<
-          PCollection<KV<K, V>>, PCollection<KeyedWorkItem<K, V>>> {
+      extends PTransform<PCollection<KV<K, V>>, PCollection<KeyedWorkItem<K, V>>> {
     @Override
     public PCollection<KeyedWorkItem<K, V>> expand(PCollection<KV<K, V>> input) {
       return PCollection.createPrimitiveOutputInternal(
@@ -89,21 +85,10 @@ class DirectGroupByKey<K, V>
     }
 
     DirectGroupByKeyOnly() {}
-
-    @Override
-    public String getUrn() {
-      return DIRECT_GBKO_URN;
-    }
-
-    @Nullable
-    @Override
-    public RunnerApi.FunctionSpec getSpec() {
-      return null;
-    }
   }
 
   static final class DirectGroupAlsoByWindow<K, V>
-      extends PTransformTranslation.RawPTransform<
+      extends PTransform<
           PCollection<KeyedWorkItem<K, V>>, PCollection<KV<K, Iterable<V>>>> {
 
     private final WindowingStrategy<?, ?> inputWindowingStrategy;
@@ -144,16 +129,5 @@ class DirectGroupByKey<K, V>
           input.getPipeline(), outputWindowingStrategy, input.isBounded(),
           KvCoder.of(inputCoder.getKeyCoder(), IterableCoder.of(inputCoder.getElementCoder())));
     }
-
-    @Override
-    public String getUrn() {
-      return DIRECT_GABW_URN;
-    }
-
-    @Nullable
-    @Override
-    public RunnerApi.FunctionSpec getSpec() {
-      return null;
-    }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/505021e6/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
index 5ec52be..e8a9c83 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
@@ -23,12 +23,10 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.KeyedWorkItemCoder;
 import org.apache.beam.runners.core.KeyedWorkItems;
 import org.apache.beam.runners.core.construction.PTransformReplacements;
-import org.apache.beam.runners.core.construction.PTransformTranslation;
 import org.apache.beam.runners.core.construction.ParDoTranslation;
 import org.apache.beam.runners.core.construction.ReplacementOutputs;
 import org.apache.beam.runners.core.construction.SplittableParDo;
@@ -204,8 +202,7 @@ class ParDoMultiOverrideFactory<InputT, OutputT>
       "urn:beam:directrunner:transforms:stateful_pardo:v1";
 
   static class StatefulParDo<K, InputT, OutputT>
-      extends PTransformTranslation.RawPTransform<
-          PCollection<? extends KeyedWorkItem<K, KV<K, InputT>>>, PCollectionTuple> {
+      extends PTransform<PCollection<? extends KeyedWorkItem<K, KV<K, InputT>>>, PCollectionTuple> {
     private final transient DoFn<KV<K, InputT>, OutputT> doFn;
     private final TupleTagList additionalOutputTags;
     private final TupleTag<OutputT> mainOutputTag;
@@ -257,17 +254,6 @@ class ParDoMultiOverrideFactory<InputT, OutputT>
 
       return outputs;
     }
-
-    @Override
-    public String getUrn() {
-      return DIRECT_STATEFUL_PAR_DO_URN;
-    }
-
-    @Override
-    public RunnerApi.FunctionSpec getSpec() {
-      throw new UnsupportedOperationException(
-          String.format("%s should never be serialized to proto", getClass().getSimpleName()));
-    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/505021e6/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
index d62b64c..e42b5fe 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
@@ -29,8 +29,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.annotation.Nullable;
-import org.apache.beam.model.pipeline.v1.RunnerApi;
-import org.apache.beam.runners.core.construction.PTransformTranslation;
 import org.apache.beam.runners.core.construction.ReplacementOutputs;
 import org.apache.beam.runners.core.construction.TestStreamTranslation;
 import org.apache.beam.sdk.runners.AppliedPTransform;
@@ -194,8 +192,7 @@ class TestStreamEvaluatorFactory implements TransformEvaluatorFactory {
 
     static final String DIRECT_TEST_STREAM_URN = "urn:beam:directrunner:transforms:test_stream:v1";
 
-    static class DirectTestStream<T>
-        extends PTransformTranslation.RawPTransform<PBegin, PCollection<T>> {
+    static class DirectTestStream<T> extends PTransform<PBegin, PCollection<T>> {
       private final transient DirectRunner runner;
       private final TestStream<T> original;
 
@@ -214,17 +211,6 @@ class TestStreamEvaluatorFactory implements TransformEvaluatorFactory {
             IsBounded.UNBOUNDED,
             original.getValueCoder());
       }
-
-      @Override
-      public String getUrn() {
-        return DIRECT_TEST_STREAM_URN;
-      }
-
-      @Nullable
-      @Override
-      public RunnerApi.FunctionSpec getSpec() {
-        return null;
-      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/505021e6/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
index 099252f..708a931 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
@@ -98,20 +98,22 @@ class TransformEvaluatorRegistry implements TransformEvaluatorFactory {
           .<Class<? extends PTransform>, PTransformTranslation.TransformPayloadTranslator>builder()
           .put(
               DirectGroupByKey.DirectGroupByKeyOnly.class,
-              new PTransformTranslation.RawPTransformTranslator())
+              TransformPayloadTranslator.NotSerializable.forUrn(DIRECT_GBKO_URN))
           .put(
               DirectGroupByKey.DirectGroupAlsoByWindow.class,
-              new PTransformTranslation.RawPTransformTranslator())
+              TransformPayloadTranslator.NotSerializable.forUrn(DIRECT_GABW_URN))
           .put(
               ParDoMultiOverrideFactory.StatefulParDo.class,
-              new PTransformTranslation.RawPTransformTranslator())
+              TransformPayloadTranslator.NotSerializable.forUrn(DIRECT_STATEFUL_PAR_DO_URN))
           .put(
               ViewOverrideFactory.WriteView.class,
-              new PTransformTranslation.RawPTransformTranslator())
-          .put(DirectTestStream.class, new PTransformTranslation.RawPTransformTranslator())
+              TransformPayloadTranslator.NotSerializable.forUrn(DIRECT_WRITE_VIEW_URN))
+          .put(
+              DirectTestStream.class,
+              TransformPayloadTranslator.NotSerializable.forUrn(DIRECT_TEST_STREAM_URN))
           .put(
               SplittableParDoViaKeyedWorkItems.ProcessElements.class,
-              new SplittableParDoProcessElementsTranslator())
+              TransformPayloadTranslator.NotSerializable.forUrn(SPLITTABLE_PROCESS_URN))
           .build();
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/505021e6/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
index 61b7978..0079f98 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
@@ -20,11 +20,8 @@ package org.apache.beam.runners.direct;
 
 import java.io.IOException;
 import java.util.Map;
-import javax.annotation.Nullable;
-import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.construction.CreatePCollectionViewTranslation;
 import org.apache.beam.runners.core.construction.PTransformReplacements;
-import org.apache.beam.runners.core.construction.PTransformTranslation.RawPTransform;
 import org.apache.beam.runners.core.construction.ReplacementOutputs;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
@@ -107,7 +104,7 @@ class ViewOverrideFactory<ElemT, ViewT>
    * to {@link ViewT}.
    */
   static final class WriteView<ElemT, ViewT>
-      extends RawPTransform<PCollection<Iterable<ElemT>>, PCollection<Iterable<ElemT>>> {
+      extends PTransform<PCollection<Iterable<ElemT>>, PCollection<Iterable<ElemT>>> {
     private final PCollectionView<ViewT> view;
 
     WriteView(PCollectionView<ViewT> view) {
@@ -125,17 +122,6 @@ class ViewOverrideFactory<ElemT, ViewT>
     public PCollectionView<ViewT> getView() {
       return view;
     }
-
-    @Override
-    public String getUrn() {
-      return DIRECT_WRITE_VIEW_URN;
-    }
-
-    @Nullable
-    @Override
-    public RunnerApi.FunctionSpec getSpec() {
-      return null;
-    }
   }
 
   public static final String DIRECT_WRITE_VIEW_URN =


[05/14] beam git commit: Add TransformPayloadTranslator.rehydrate to optionally specialize RawPTransform

Posted by ke...@apache.org.
Add TransformPayloadTranslator.rehydrate to optionally specialize RawPTransform


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/10c63e15
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/10c63e15
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/10c63e15

Branch: refs/heads/master
Commit: 10c63e15ab51b885372f7b6251d8ace63bae0ad1
Parents: c14455e
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Oct 2 19:25:28 2017 -0700
Committer: Kenneth Knowles <ke...@apache.org>
Committed: Tue Oct 17 12:45:11 2017 -0700

----------------------------------------------------------------------
 .../core/construction/CombineTranslation.java   |  33 ++--
 .../CreatePCollectionViewTranslation.java       |   7 +-
 .../core/construction/FlattenTranslator.java    |   8 +-
 .../construction/GroupByKeyTranslation.java     |  13 +-
 .../construction/PTransformTranslation.java     | 155 ++++++++++++++++++-
 .../core/construction/ParDoTranslation.java     |  70 ++++-----
 .../core/construction/PipelineTranslation.java  |  76 +--------
 .../core/construction/ReadTranslation.java      |  43 +++--
 .../construction/TestStreamTranslation.java     |   8 +-
 .../TransformPayloadTranslatorRegistrar.java    |   2 +
 .../construction/WindowIntoTranslation.java     |  15 +-
 .../construction/WriteFilesTranslation.java     |  16 +-
 .../direct/TransformEvaluatorRegistry.java      |  18 +--
 runners/flink/pom.xml                           |   5 -
 .../FlinkStreamingTransformTranslators.java     |  60 ++-----
 15 files changed, 280 insertions(+), 249 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/10c63e15/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java
index c3d9553..69591ee 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java
@@ -54,11 +54,10 @@ import org.apache.beam.sdk.values.PCollection;
 public class CombineTranslation {
   public static final String JAVA_SERIALIZED_COMBINE_FN_URN = "urn:beam:combinefn:javasdk:v1";
 
-   /**
-   * A {@link TransformPayloadTranslator} for {@link Combine.PerKey}.
-   */
+  /** A {@link TransformPayloadTranslator} for {@link Combine.PerKey}. */
   public static class CombinePayloadTranslator
-      implements PTransformTranslation.TransformPayloadTranslator<Combine.PerKey<?, ?, ?>> {
+      extends PTransformTranslation.TransformPayloadTranslator.WithDefaultRehydration<
+          Combine.PerKey<?, ?, ?>> {
     public static TransformPayloadTranslator create() {
       return new CombinePayloadTranslator();
     }
@@ -81,9 +80,7 @@ public class CombineTranslation {
           .build();
     }
 
-    /**
-     * Registers {@link CombinePayloadTranslator}.
-     */
+    /** Registers {@link CombinePayloadTranslator}. */
     @AutoService(TransformPayloadTranslatorRegistrar.class)
     public static class Registrar implements TransformPayloadTranslatorRegistrar {
       @Override
@@ -91,6 +88,11 @@ public class CombineTranslation {
           getTransformPayloadTranslators() {
         return Collections.singletonMap(Combine.PerKey.class, new CombinePayloadTranslator());
       }
+
+      @Override
+      public Map<String, TransformPayloadTranslator> getTransformRehydrators() {
+        return Collections.emptyMap();
+      }
     }
   }
 
@@ -136,8 +138,7 @@ public class CombineTranslation {
         .setSpec(
             FunctionSpec.newBuilder()
                 .setUrn(JAVA_SERIALIZED_COMBINE_FN_URN)
-                .setPayload(
-                    ByteString.copyFrom(SerializableUtils.serializeToByteArray(combineFn)))
+                .setPayload(ByteString.copyFrom(SerializableUtils.serializeToByteArray(combineFn)))
                 .build())
         .build();
   }
@@ -148,8 +149,8 @@ public class CombineTranslation {
     return components.getCoder(id);
   }
 
-  public static Coder<?> getAccumulatorCoder(
-      AppliedPTransform<?, ?, ?> transform) throws IOException {
+  public static Coder<?> getAccumulatorCoder(AppliedPTransform<?, ?, ?> transform)
+      throws IOException {
     SdkComponents sdkComponents = SdkComponents.create();
     String id = getCombinePayload(transform, sdkComponents).getAccumulatorCoderId();
     Components components = sdkComponents.toComponents();
@@ -157,17 +158,11 @@ public class CombineTranslation {
         components.getCodersOrThrow(id), RehydratedComponents.forComponents(components));
   }
 
-  public static GlobalCombineFn<?, ?, ?> getCombineFn(CombinePayload payload)
-      throws IOException {
+  public static GlobalCombineFn<?, ?, ?> getCombineFn(CombinePayload payload) throws IOException {
     checkArgument(payload.getCombineFn().getSpec().getUrn().equals(JAVA_SERIALIZED_COMBINE_FN_URN));
     return (GlobalCombineFn<?, ?, ?>)
         SerializableUtils.deserializeFromByteArray(
-            payload
-                .getCombineFn()
-                .getSpec()
-                .getPayload()
-                .toByteArray(),
-            "CombineFn");
+            payload.getCombineFn().getSpec().getPayload().toByteArray(), "CombineFn");
   }
 
   public static GlobalCombineFn<?, ?, ?> getCombineFn(AppliedPTransform<?, ?, ?> transform)

http://git-wip-us.apache.org/repos/asf/beam/blob/10c63e15/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java
index 4b8edcf..709cb8a 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java
@@ -88,7 +88,7 @@ public class CreatePCollectionViewTranslation {
    */
   @Deprecated
   static class CreatePCollectionViewTranslator
-      implements TransformPayloadTranslator<View.CreatePCollectionView<?, ?>> {
+      extends TransformPayloadTranslator.WithDefaultRehydration<View.CreatePCollectionView<?, ?>> {
     @Override
     public String getUrn(View.CreatePCollectionView<?, ?> transform) {
       return PTransformTranslation.CREATE_VIEW_TRANSFORM_URN;
@@ -122,5 +122,10 @@ public class CreatePCollectionViewTranslation {
       return Collections.singletonMap(
           View.CreatePCollectionView.class, new CreatePCollectionViewTranslator());
     }
+
+    @Override
+    public Map<String, TransformPayloadTranslator> getTransformRehydrators() {
+      return Collections.emptyMap();
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/10c63e15/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/FlattenTranslator.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/FlattenTranslator.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/FlattenTranslator.java
index c9798e6..972c453 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/FlattenTranslator.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/FlattenTranslator.java
@@ -32,7 +32,8 @@ import org.apache.beam.sdk.transforms.windowing.Window.Assign;
 /**
  * Utility methods for translating a {@link Assign} to and from {@link RunnerApi} representations.
  */
-public class FlattenTranslator implements TransformPayloadTranslator<Flatten.PCollections<?>> {
+public class FlattenTranslator
+    extends TransformPayloadTranslator.WithDefaultRehydration<Flatten.PCollections<?>> {
 
   public static TransformPayloadTranslator create() {
     return new FlattenTranslator();
@@ -59,5 +60,10 @@ public class FlattenTranslator implements TransformPayloadTranslator<Flatten.PCo
         getTransformPayloadTranslators() {
       return Collections.singletonMap(Flatten.PCollections.class, new FlattenTranslator());
     }
+
+    @Override
+    public Map<String, TransformPayloadTranslator> getTransformRehydrators() {
+      return Collections.emptyMap();
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/10c63e15/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/GroupByKeyTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/GroupByKeyTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/GroupByKeyTranslation.java
index 840bae2..0803ad3 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/GroupByKeyTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/GroupByKeyTranslation.java
@@ -34,7 +34,8 @@ import org.apache.beam.sdk.transforms.PTransform;
  */
 public class GroupByKeyTranslation {
 
-  static class GroupByKeyTranslator implements TransformPayloadTranslator<GroupByKey<?, ?>> {
+  static class GroupByKeyTranslator
+      extends TransformPayloadTranslator.WithDefaultRehydration<GroupByKey<?, ?>> {
     @Override
     public String getUrn(GroupByKey<?, ?> transform) {
       return PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN;
@@ -43,13 +44,10 @@ public class GroupByKeyTranslation {
     @Override
     public FunctionSpec translate(
         AppliedPTransform<?, ?, GroupByKey<?, ?>> transform, SdkComponents components) {
-      return FunctionSpec.newBuilder()
-          .setUrn(getUrn(transform.getTransform()))
-          .build();
+      return FunctionSpec.newBuilder().setUrn(getUrn(transform.getTransform())).build();
     }
   }
 
-
   /** Registers {@link GroupByKeyTranslator}. */
   @AutoService(TransformPayloadTranslatorRegistrar.class)
   public static class Registrar implements TransformPayloadTranslatorRegistrar {
@@ -58,5 +56,10 @@ public class GroupByKeyTranslation {
         getTransformPayloadTranslators() {
       return Collections.singletonMap(GroupByKey.class, new GroupByKeyTranslator());
     }
+
+    @Override
+    public Map<String, TransformPayloadTranslator> getTransformRehydrators() {
+      return Collections.emptyMap();
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/10c63e15/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
index 31767a0..785b9e4 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
@@ -20,7 +20,9 @@ package org.apache.beam.runners.core.construction;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
+import com.google.auto.value.AutoValue;
 import com.google.common.base.Joiner;
+import com.google.common.base.MoreObjects;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Sets;
 import java.io.IOException;
@@ -74,6 +76,12 @@ public class PTransformTranslation {
   private static final Map<Class<? extends PTransform>, TransformPayloadTranslator>
       KNOWN_PAYLOAD_TRANSLATORS = loadTransformPayloadTranslators();
 
+  private static final Map<String, TransformPayloadTranslator> KNOWN_REHYDRATORS =
+      loadTransformRehydrators();
+
+  private static final TransformPayloadTranslator<?> DEFAULT_REHYDRATOR =
+      new RawPTransformTranslator();
+
   private static Map<Class<? extends PTransform>, TransformPayloadTranslator>
       loadTransformPayloadTranslators() {
     HashMap<Class<? extends PTransform>, TransformPayloadTranslator> translators = new HashMap<>();
@@ -98,6 +106,29 @@ public class PTransformTranslation {
     return ImmutableMap.copyOf(translators);
   }
 
+  private static Map<String, TransformPayloadTranslator> loadTransformRehydrators() {
+    HashMap<String, TransformPayloadTranslator> rehydrators = new HashMap<>();
+
+    for (TransformPayloadTranslatorRegistrar registrar :
+        ServiceLoader.load(TransformPayloadTranslatorRegistrar.class)) {
+
+      Map<String, ? extends TransformPayloadTranslator> newRehydrators =
+          registrar.getTransformRehydrators();
+
+      Set<String> alreadyRegistered =
+          Sets.intersection(rehydrators.keySet(), newRehydrators.keySet());
+
+      if (!alreadyRegistered.isEmpty()) {
+        throw new IllegalArgumentException(
+            String.format(
+                "URNs already registered: %s", Joiner.on(", ").join(alreadyRegistered)));
+      }
+
+      rehydrators.putAll(newRehydrators);
+    }
+    return ImmutableMap.copyOf(rehydrators);
+  }
+
   private PTransformTranslation() {}
 
   /**
@@ -150,17 +181,36 @@ public class PTransformTranslation {
       // context of our current serialization
       transformBuilder.setSpec(((RawPTransform<?, ?>) transform).migrate(components));
     } else if (KNOWN_PAYLOAD_TRANSLATORS.containsKey(transform.getClass())) {
-      FunctionSpec payload =
+      transformBuilder.setSpec(
           KNOWN_PAYLOAD_TRANSLATORS
               .get(transform.getClass())
-              .translate(appliedPTransform, components);
-      transformBuilder.setSpec(payload);
+              .translate(appliedPTransform, components));
     }
 
     return transformBuilder.build();
   }
 
   /**
+   * Translates a {@link RunnerApi.PTransform} to a {@link RawPTransform} specialized for the URN
+   * and spec.
+   */
+  static RawPTransform<?, ?> rehydrate(
+      RunnerApi.PTransform protoTransform, RehydratedComponents rehydratedComponents)
+      throws IOException {
+
+    @Nullable
+    TransformPayloadTranslator<?> rehydrator =
+        KNOWN_REHYDRATORS.get(
+            protoTransform.getSpec() == null ? null : protoTransform.getSpec().getUrn());
+
+    if (rehydrator == null) {
+      return DEFAULT_REHYDRATOR.rehydrate(protoTransform, rehydratedComponents);
+    } else {
+      return rehydrator.rehydrate(protoTransform, rehydratedComponents);
+    }
+  }
+
+  /**
    * Translates a composite {@link AppliedPTransform} into a runner API proto with no component
    * transforms.
    *
@@ -206,14 +256,66 @@ public class PTransformTranslation {
   }
 
   /**
-   * A translator consumes a {@link PTransform} application and produces the appropriate
-   * FunctionSpec for a distinguished or primitive transform within the Beam runner API.
+   * A bi-directional translator between a Java-based {@link PTransform} and a protobuf payload for
+   * that transform.
+   *
+   * <p>When going to a protocol buffer message, the translator produces a payload corresponding to
+   * the Java representation while registering components that payload references.
+   *
+   * <p>When "rehydrating" a protocol buffer message, the translator returns a {@link RawPTransform}
+   * - because the transform may not be Java-based, it is not possible to rebuild a Java-based
+   * {@link PTransform}. The resulting {@link RawPTransform} subclass encapsulates the knowledge of
+   * which components are referenced in the payload.
    */
   public interface TransformPayloadTranslator<T extends PTransform<?, ?>> {
     String getUrn(T transform);
 
     FunctionSpec translate(AppliedPTransform<?, ?, T> application, SdkComponents components)
         throws IOException;
+
+    RawPTransform<?, ?> rehydrate(
+        RunnerApi.PTransform protoTransform, RehydratedComponents rehydratedComponents)
+        throws IOException;
+
+    /**
+     * A {@link TransformPayloadTranslator} for transforms that contain no references to components,
+     * so they do not need a specialized rehydration.
+     */
+    abstract class WithDefaultRehydration<T extends PTransform<?, ?>>
+        implements TransformPayloadTranslator<T> {
+      @Override
+      public final RawPTransform<?, ?> rehydrate(
+          RunnerApi.PTransform protoTransform, RehydratedComponents rehydratedComponents)
+          throws IOException {
+        return UnknownRawPTransform.forSpec(protoTransform.getSpec());
+      }
+    }
+
+    /**
+     * A {@link TransformPayloadTranslator} for transforms that contain no references to components,
+     * so they do not need a specialized rehydration.
+     */
+    abstract class NotSerializable<T extends PTransform<?, ?>>
+        implements TransformPayloadTranslator<T> {
+      @Override
+      public final FunctionSpec translate(
+          AppliedPTransform<?, ?, T> transform, SdkComponents components) throws IOException {
+        throw new UnsupportedOperationException(
+            String.format(
+                "%s should never be translated",
+                transform.getTransform().getClass().getCanonicalName()));
+      }
+
+      @Override
+      public final RawPTransform<?, ?> rehydrate(
+          RunnerApi.PTransform protoTransform, RehydratedComponents rehydratedComponents)
+          throws IOException {
+        throw new UnsupportedOperationException(
+            String.format(
+                "%s.rehydrate should never be called; there is no serialized form",
+                getClass().getCanonicalName()));
+      }
+    }
   }
 
   /**
@@ -264,6 +366,43 @@ public class PTransformTranslation {
     }
   }
 
+  @AutoValue
+  abstract static class UnknownRawPTransform extends RawPTransform<PInput, POutput> {
+
+    @Override
+    public String getUrn() {
+      return getSpec() == null ? null : getSpec().getUrn();
+    }
+
+    @Nullable
+    public abstract RunnerApi.FunctionSpec getSpec();
+
+    public static UnknownRawPTransform forSpec(RunnerApi.FunctionSpec spec) {
+      return new AutoValue_PTransformTranslation_UnknownRawPTransform(spec);
+    }
+
+    @Override
+    public POutput expand(PInput input) {
+      throw new IllegalStateException(
+          String.format(
+              "%s should never be asked to expand;"
+                  + " it is the result of deserializing an already-constructed Pipeline",
+              getClass().getSimpleName()));
+    }
+
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(this)
+          .add("urn", getUrn())
+          .add("payload", getSpec())
+          .toString();
+    }
+
+    public RunnerApi.FunctionSpec getSpecForComponents(SdkComponents components) {
+      return getSpec();
+    }
+  }
+
   /** A translator that uses the explicit URN and payload from a {@link RawPTransform}. */
   public static class RawPTransformTranslator
       implements TransformPayloadTranslator<RawPTransform<?, ?>> {
@@ -278,5 +417,11 @@ public class PTransformTranslation {
         throws IOException {
       return transform.getTransform().migrate(components);
     }
+
+    @Override
+    public RawPTransform<?, ?> rehydrate(
+        RunnerApi.PTransform protoTransform, RehydratedComponents rehydratedComponents) {
+      return UnknownRawPTransform.forSpec(protoTransform.getSpec());
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/10c63e15/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
index 3886e47..5092448 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
@@ -79,29 +79,20 @@ import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
 import org.apache.beam.sdk.values.WindowingStrategy;
 
-/**
- * Utilities for interacting with {@link ParDo} instances and {@link ParDoPayload} protos.
- */
+/** Utilities for interacting with {@link ParDo} instances and {@link ParDoPayload} protos. */
 public class ParDoTranslation {
-  /**
-   * The URN for an unknown Java {@link DoFn}.
-   */
+  /** The URN for an unknown Java {@link DoFn}. */
   public static final String CUSTOM_JAVA_DO_FN_URN = "urn:beam:dofn:javasdk:0.1";
-  /**
-   * The URN for an unknown Java {@link ViewFn}.
-   */
+  /** The URN for an unknown Java {@link ViewFn}. */
   public static final String CUSTOM_JAVA_VIEW_FN_URN = "urn:beam:viewfn:javasdk:0.1";
-  /**
-   * The URN for an unknown Java {@link WindowMappingFn}.
-   */
+  /** The URN for an unknown Java {@link WindowMappingFn}. */
   public static final String CUSTOM_JAVA_WINDOW_MAPPING_FN_URN =
       "urn:beam:windowmappingfn:javasdk:0.1";
 
-  /**
-   * A {@link TransformPayloadTranslator} for {@link ParDo}.
-   */
+  /** A {@link TransformPayloadTranslator} for {@link ParDo}. */
   public static class ParDoPayloadTranslator
-      implements PTransformTranslation.TransformPayloadTranslator<ParDo.MultiOutput<?, ?>> {
+      extends PTransformTranslation.TransformPayloadTranslator.WithDefaultRehydration<
+          ParDo.MultiOutput<?, ?>> {
     public static TransformPayloadTranslator create() {
       return new ParDoPayloadTranslator();
     }
@@ -124,9 +115,7 @@ public class ParDoTranslation {
           .build();
     }
 
-    /**
-     * Registers {@link ParDoPayloadTranslator}.
-     */
+    /** Registers {@link ParDoPayloadTranslator}. */
     @AutoService(TransformPayloadTranslatorRegistrar.class)
     public static class Registrar implements TransformPayloadTranslatorRegistrar {
       @Override
@@ -134,11 +123,16 @@ public class ParDoTranslation {
           getTransformPayloadTranslators() {
         return Collections.singletonMap(ParDo.MultiOutput.class, new ParDoPayloadTranslator());
       }
+
+      @Override
+      public Map<String, TransformPayloadTranslator> getTransformRehydrators() {
+        return Collections.emptyMap();
+      }
     }
   }
 
   public static ParDoPayload toProto(ParDo.MultiOutput<?, ?> parDo, SdkComponents components)
-  throws IOException {
+      throws IOException {
     DoFn<?, ?> doFn = parDo.getFn();
     DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
     Map<String, StateDeclaration> states = signature.stateDeclarations();
@@ -158,13 +152,11 @@ public class ParDoTranslation {
       }
     }
     for (Map.Entry<String, StateDeclaration> state : states.entrySet()) {
-      RunnerApi.StateSpec spec =
-          toProto(getStateSpecOrCrash(state.getValue(), doFn), components);
+      RunnerApi.StateSpec spec = toProto(getStateSpecOrCrash(state.getValue(), doFn), components);
       builder.putStateSpecs(state.getKey(), spec);
     }
     for (Map.Entry<String, TimerDeclaration> timer : timers.entrySet()) {
-      RunnerApi.TimerSpec spec =
-          toProto(getTimerSpecOrCrash(timer.getValue(), doFn));
+      RunnerApi.TimerSpec spec = toProto(getTimerSpecOrCrash(timer.getValue(), doFn));
       builder.putTimerSpecs(timer.getKey(), spec);
     }
     return builder.build();
@@ -174,7 +166,8 @@ public class ParDoTranslation {
       StateDeclaration stateDeclaration, DoFn<?, ?> target) {
     try {
       Object fieldValue = stateDeclaration.field().get(target);
-      checkState(fieldValue instanceof StateSpec,
+      checkState(
+          fieldValue instanceof StateSpec,
           "Malformed %s class %s: state declaration field %s does not have type %s.",
           DoFn.class.getSimpleName(),
           target.getClass().getName(),
@@ -196,7 +189,8 @@ public class ParDoTranslation {
       TimerDeclaration timerDeclaration, DoFn<?, ?> target) {
     try {
       Object fieldValue = timerDeclaration.field().get(target);
-      checkState(fieldValue instanceof TimerSpec,
+      checkState(
+          fieldValue instanceof TimerSpec,
           "Malformed %s class %s: timer declaration field %s does not have type %s.",
           DoFn.class.getSimpleName(),
           target.getClass().getName(),
@@ -273,8 +267,7 @@ public class ParDoTranslation {
     }
 
     SdkComponents sdkComponents = SdkComponents.create();
-    RunnerApi.PTransform parDoProto =
-        PTransformTranslation.toProto(application, sdkComponents);
+    RunnerApi.PTransform parDoProto = PTransformTranslation.toProto(application, sdkComponents);
     ParDoPayload payload = ParDoPayload.parseFrom(parDoProto.getSpec().getPayload());
 
     List<PCollectionView<?>> views = new ArrayList<>();
@@ -289,12 +282,7 @@ public class ParDoTranslation {
               "no input with tag %s",
               sideInputTag);
       views.add(
-          viewFromProto(
-              sideInput,
-              sideInputTag,
-              originalPCollection,
-              parDoProto,
-              components));
+          viewFromProto(sideInput, sideInputTag, originalPCollection, parDoProto, components));
     }
     return views;
   }
@@ -414,7 +402,6 @@ public class ParDoTranslation {
       default:
         throw new IllegalArgumentException(
             String.format("Unknown %s: %s", RunnerApi.StateSpec.class.getName(), stateSpec));
-
     }
   }
 
@@ -431,7 +418,7 @@ public class ParDoTranslation {
   }
 
   private static RunnerApi.TimeDomain.Enum toProto(TimeDomain timeDomain) {
-    switch(timeDomain) {
+    switch (timeDomain) {
       case EVENT_TIME:
         return RunnerApi.TimeDomain.Enum.EVENT_TIME;
       case PROCESSING_TIME:
@@ -445,12 +432,12 @@ public class ParDoTranslation {
 
   @AutoValue
   abstract static class DoFnAndMainOutput implements Serializable {
-    public static DoFnAndMainOutput of(
-        DoFn<?, ?> fn, TupleTag<?> tag) {
+    public static DoFnAndMainOutput of(DoFn<?, ?> fn, TupleTag<?> tag) {
       return new AutoValue_ParDoTranslation_DoFnAndMainOutput(fn, tag);
     }
 
     abstract DoFn<?, ?> getDoFn();
+
     abstract TupleTag<?> getMainOutputTag();
   }
 
@@ -475,8 +462,7 @@ public class ParDoTranslation {
         FunctionSpec.class.getSimpleName(),
         CUSTOM_JAVA_DO_FN_URN,
         fnSpec.getSpec().getUrn());
-    byte[] serializedFn =
-        fnSpec.getSpec().getPayload().toByteArray();
+    byte[] serializedFn = fnSpec.getSpec().getPayload().toByteArray();
     return (DoFnAndMainOutput)
         SerializableUtils.deserializeFromByteArray(serializedFn, "Custom DoFn And Main Output tag");
   }
@@ -505,9 +491,7 @@ public class ParDoTranslation {
   public static SideInput toProto(PCollectionView<?> view) {
     Builder builder = SideInput.newBuilder();
     builder.setAccessPattern(
-        FunctionSpec.newBuilder()
-            .setUrn(view.getViewFn().getMaterialization().getUrn())
-            .build());
+        FunctionSpec.newBuilder().setUrn(view.getViewFn().getMaterialization().getUrn()).build());
     builder.setViewFn(toProto(view.getViewFn()));
     builder.setWindowMappingFn(toProto(view.getWindowMappingFn()));
     return builder.build();

http://git-wip-us.apache.org/repos/asf/beam/blob/10c63e15/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
index 1624865..85033e5 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
@@ -20,8 +20,6 @@ package org.apache.beam.runners.core.construction;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
-import com.google.auto.value.AutoValue;
-import com.google.common.base.MoreObjects;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ListMultimap;
 import java.io.IOException;
@@ -32,12 +30,10 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import javax.annotation.Nullable;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.construction.PTransformTranslation.RawPTransform;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.Pipeline.PipelineVisitor;
-import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.runners.TransformHierarchy;
@@ -47,8 +43,6 @@ import org.apache.beam.sdk.transforms.display.HasDisplayData;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PCollectionViews;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TupleTag;
 
@@ -148,6 +142,8 @@ public class PipelineTranslation {
     }
 
     RunnerApi.FunctionSpec transformSpec = transformProto.getSpec();
+    RawPTransform<?, ?> transform =
+        PTransformTranslation.rehydrate(transformProto, rehydratedComponents);
 
     // By default, no "additional" inputs, since that is an SDK-specific thing.
     // Only ParDo and WriteFiles really separate main from side inputs
@@ -170,20 +166,6 @@ public class PipelineTranslation {
               transformProto, rehydratedComponents, rehydratedInputs, payload.getSideInputsMap());
     }
 
-    // TODO: CombineTranslator should own it - https://issues.apache.org/jira/browse/BEAM-2674
-    List<Coder<?>> additionalCoders = Collections.emptyList();
-    if (transformSpec.getUrn().equals(PTransformTranslation.COMBINE_TRANSFORM_URN)) {
-      RunnerApi.CombinePayload payload =
-          RunnerApi.CombinePayload.parseFrom(transformSpec.getPayload());
-      additionalCoders =
-          (List)
-              Collections.singletonList(
-                  rehydratedComponents.getCoder(payload.getAccumulatorCoderId()));
-    }
-
-    RehydratedPTransform transform =
-        RehydratedPTransform.of(transformSpec, additionalInputs, additionalCoders);
-
     if (isPrimitive(transformProto)) {
       transforms.addFinalizedPrimitiveNode(
           transformProto.getUniqueName(), rehydratedInputs, transform, rehydratedOutputs);
@@ -232,58 +214,4 @@ public class PipelineTranslation {
             .values()
             .containsAll(transformProto.getOutputsMap().values());
   }
-
-  @AutoValue
-  abstract static class RehydratedPTransform extends RawPTransform<PInput, POutput> {
-
-    @Nullable
-    public abstract RunnerApi.FunctionSpec getSpec();
-
-    @Override
-    public abstract Map<TupleTag<?>, PValue> getAdditionalInputs();
-
-    public abstract List<Coder<?>> getCoders();
-
-    @Override
-    public String getUrn() {
-      return getSpec().getUrn();
-    }
-
-    public static RehydratedPTransform of(
-        RunnerApi.FunctionSpec payload,
-        Map<TupleTag<?>, PValue> additionalInputs,
-        List<Coder<?>> additionalCoders) {
-      return new AutoValue_PipelineTranslation_RehydratedPTransform(
-          payload, additionalInputs, additionalCoders);
-    }
-
-    @Override
-    public POutput expand(PInput input) {
-      throw new IllegalStateException(
-          String.format(
-              "%s should never be asked to expand;"
-                  + " it is the result of deserializing an already-constructed Pipeline",
-              getClass().getSimpleName()));
-    }
-
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(this)
-          .add("urn", getUrn())
-          .add("payload", getSpec())
-          .toString();
-    }
-
-    @Override
-    public RunnerApi.FunctionSpec migrate(SdkComponents components) {
-      for (Coder<?> coder : getCoders()) {
-        try {
-          components.registerCoder(coder);
-        } catch (IOException e) {
-          throw new RuntimeException(e);
-        }
-      }
-      return getSpec();
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/10c63e15/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java
index e9168a2..4b14c51 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java
@@ -44,8 +44,8 @@ import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 
 /**
- * Methods for translating {@link Read.Bounded} and {@link Read.Unbounded}
- * {@link PTransform PTransformTranslation} into {@link ReadPayload} protos.
+ * Methods for translating {@link Read.Bounded} and {@link Read.Unbounded} {@link PTransform
+ * PTransformTranslation} into {@link ReadPayload} protos.
  */
 public class ReadTranslation {
   private static final String JAVA_SERIALIZED_BOUNDED_SOURCE = "urn:beam:java:boundedsource:v1";
@@ -89,13 +89,9 @@ public class ReadTranslation {
   public static BoundedSource<?> boundedSourceFromProto(ReadPayload payload)
       throws InvalidProtocolBufferException {
     checkArgument(payload.getIsBounded().equals(IsBounded.Enum.BOUNDED));
-    return (BoundedSource<?>) SerializableUtils.deserializeFromByteArray(
-        payload
-            .getSource()
-            .getSpec()
-            .getPayload()
-            .toByteArray(),
-        "BoundedSource");
+    return (BoundedSource<?>)
+        SerializableUtils.deserializeFromByteArray(
+            payload.getSource().getSpec().getPayload().toByteArray(), "BoundedSource");
   }
 
   public static <T> BoundedSource<T> boundedSourceFromTransform(
@@ -136,13 +132,9 @@ public class ReadTranslation {
   public static UnboundedSource<?, ?> unboundedSourceFromProto(ReadPayload payload)
       throws InvalidProtocolBufferException {
     checkArgument(payload.getIsBounded().equals(IsBounded.Enum.UNBOUNDED));
-    return (UnboundedSource<?, ?>) SerializableUtils.deserializeFromByteArray(
-        payload
-            .getSource()
-            .getSpec()
-            .getPayload()
-            .toByteArray(),
-        "BoundedSource");
+    return (UnboundedSource<?, ?>)
+        SerializableUtils.deserializeFromByteArray(
+            payload.getSource().getSpec().getPayload().toByteArray(), "BoundedSource");
   }
 
   public static PCollection.IsBounded sourceIsBounded(AppliedPTransform<?, ?, ?> transform) {
@@ -161,11 +153,10 @@ public class ReadTranslation {
     }
   }
 
-  /**
-   * A {@link TransformPayloadTranslator} for {@link Read.Unbounded}.
-   */
+  /** A {@link TransformPayloadTranslator} for {@link Read.Unbounded}. */
   public static class UnboundedReadPayloadTranslator
-      implements PTransformTranslation.TransformPayloadTranslator<Read.Unbounded<?>> {
+      extends PTransformTranslation.TransformPayloadTranslator.WithDefaultRehydration<
+          Read.Unbounded<?>> {
     public static TransformPayloadTranslator create() {
       return new UnboundedReadPayloadTranslator();
     }
@@ -188,11 +179,10 @@ public class ReadTranslation {
     }
   }
 
-  /**
-   * A {@link TransformPayloadTranslator} for {@link Read.Bounded}.
-   */
+  /** A {@link TransformPayloadTranslator} for {@link Read.Bounded}. */
   public static class BoundedReadPayloadTranslator
-      implements PTransformTranslation.TransformPayloadTranslator<Read.Bounded<?>> {
+      extends PTransformTranslation.TransformPayloadTranslator.WithDefaultRehydration<
+          Read.Bounded<?>> {
     public static TransformPayloadTranslator create() {
       return new BoundedReadPayloadTranslator();
     }
@@ -226,5 +216,10 @@ public class ReadTranslation {
           .put(Read.Bounded.class, new BoundedReadPayloadTranslator())
           .build();
     }
+
+    @Override
+    public Map<String, TransformPayloadTranslator> getTransformRehydrators() {
+      return Collections.emptyMap();
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/10c63e15/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java
index de4d6bb..8e4c1db 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java
@@ -172,7 +172,8 @@ public class TestStreamTranslation {
     }
   }
 
-  static class TestStreamTranslator implements TransformPayloadTranslator<TestStream<?>> {
+  static class TestStreamTranslator
+      extends TransformPayloadTranslator.WithDefaultRehydration<TestStream<?>> {
     @Override
     public String getUrn(TestStream<?> transform) {
       return TEST_STREAM_TRANSFORM_URN;
@@ -197,5 +198,10 @@ public class TestStreamTranslation {
         getTransformPayloadTranslators() {
       return Collections.singletonMap(TestStream.class, new TestStreamTranslator());
     }
+
+    @Override
+    public Map<String, TransformPayloadTranslator> getTransformRehydrators() {
+      return Collections.emptyMap();
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/10c63e15/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformPayloadTranslatorRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformPayloadTranslatorRegistrar.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformPayloadTranslatorRegistrar.java
index 3b3ffa1..58417a8 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformPayloadTranslatorRegistrar.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformPayloadTranslatorRegistrar.java
@@ -26,4 +26,6 @@ import org.apache.beam.sdk.transforms.PTransform;
 public interface TransformPayloadTranslatorRegistrar {
   Map<? extends Class<? extends PTransform>, ? extends TransformPayloadTranslator>
       getTransformPayloadTranslators();
+
+  Map<String, ? extends TransformPayloadTranslator> getTransformRehydrators();
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/10c63e15/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java
index ad6177d..9158aba 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java
@@ -41,7 +41,8 @@ import org.apache.beam.sdk.transforms.windowing.WindowFn;
  */
 public class WindowIntoTranslation {
 
-  static class WindowAssignTranslator implements TransformPayloadTranslator<Window.Assign<?>> {
+  static class WindowAssignTranslator
+      extends TransformPayloadTranslator.WithDefaultRehydration<Window.Assign<?>> {
 
     @Override
     public String getUrn(Assign<?> transform) {
@@ -105,11 +106,10 @@ public class WindowIntoTranslation {
         getWindowIntoPayload(application).getWindowFn());
   }
 
-  /**
-   * A {@link TransformPayloadTranslator} for {@link Window}.
-   */
+  /** A {@link TransformPayloadTranslator} for {@link Window}. */
   public static class WindowIntoPayloadTranslator
-      implements PTransformTranslation.TransformPayloadTranslator<Window.Assign<?>> {
+      extends PTransformTranslation.TransformPayloadTranslator.WithDefaultRehydration<
+          Window.Assign<?>> {
     public static TransformPayloadTranslator create() {
       return new WindowIntoPayloadTranslator();
     }
@@ -140,5 +140,10 @@ public class WindowIntoTranslation {
         getTransformPayloadTranslators() {
       return Collections.singletonMap(Window.Assign.class, new WindowIntoPayloadTranslator());
     }
+
+    @Override
+    public Map<String, TransformPayloadTranslator> getTransformRehydrators() {
+      return Collections.emptyMap();
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/10c63e15/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
index 5a49747..645b562 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.core.construction;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.beam.runners.core.construction.PTransformTranslation.WRITE_FILES_TRANSFORM_URN;
 
 import com.google.auto.service.AutoService;
 import com.google.common.annotations.VisibleForTesting;
@@ -173,10 +174,11 @@ public class WriteFilesTranslation {
             .getPayload());
   }
 
-  static class WriteFilesTranslator implements TransformPayloadTranslator<WriteFiles<?, ?, ?>> {
+  static class WriteFilesTranslator
+      extends TransformPayloadTranslator.WithDefaultRehydration<WriteFiles<?, ?, ?>> {
     @Override
     public String getUrn(WriteFiles<?, ?, ?> transform) {
-      return PTransformTranslation.WRITE_FILES_TRANSFORM_URN;
+      return WRITE_FILES_TRANSFORM_URN;
     }
 
     @Override
@@ -193,9 +195,15 @@ public class WriteFilesTranslation {
   @AutoService(TransformPayloadTranslatorRegistrar.class)
   public static class Registrar implements TransformPayloadTranslatorRegistrar {
     @Override
-    public Map<? extends Class<? extends PTransform>, ? extends TransformPayloadTranslator>
+    public Map<Class<? extends PTransform>, TransformPayloadTranslator>
         getTransformPayloadTranslators() {
-      return Collections.singletonMap(WriteFiles.class, new WriteFilesTranslator());
+      return Collections.<Class<? extends PTransform>, TransformPayloadTranslator>singletonMap(
+          WriteFiles.class, new WriteFilesTranslator());
+    }
+
+    @Override
+    public Map<String, TransformPayloadTranslator> getTransformRehydrators() {
+      return Collections.emptyMap();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/10c63e15/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
index 9cfa79f..099252f 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
@@ -35,14 +35,13 @@ import com.google.auto.service.AutoService;
 import com.google.common.collect.ImmutableMap;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
 import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems;
 import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems.ProcessElements;
 import org.apache.beam.runners.core.construction.PTransformTranslation;
 import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
-import org.apache.beam.runners.core.construction.SdkComponents;
 import org.apache.beam.runners.core.construction.TransformPayloadTranslatorRegistrar;
 import org.apache.beam.runners.direct.TestStreamEvaluatorFactory.DirectTestStreamFactory.DirectTestStream;
 import org.apache.beam.sdk.runners.AppliedPTransform;
@@ -115,6 +114,11 @@ class TransformEvaluatorRegistry implements TransformEvaluatorFactory {
               new SplittableParDoProcessElementsTranslator())
           .build();
     }
+
+    @Override
+    public Map<String, TransformPayloadTranslator> getTransformRehydrators() {
+      return Collections.emptyMap();
+    }
   }
 
   /**
@@ -122,7 +126,7 @@ class TransformEvaluatorRegistry implements TransformEvaluatorFactory {
    * once SDF is reorganized appropriately.
    */
   private static class SplittableParDoProcessElementsTranslator
-      implements TransformPayloadTranslator<ProcessElements<?, ?, ?, ?>> {
+      extends TransformPayloadTranslator.NotSerializable<ProcessElements<?, ?, ?, ?>> {
 
     private SplittableParDoProcessElementsTranslator() {}
 
@@ -130,14 +134,6 @@ class TransformEvaluatorRegistry implements TransformEvaluatorFactory {
     public String getUrn(ProcessElements<?, ?, ?, ?> transform) {
       return SPLITTABLE_PROCESS_URN;
     }
-
-    @Override
-    public FunctionSpec translate(
-        AppliedPTransform<?, ?, ProcessElements<?, ?, ?, ?>> transform, SdkComponents components) {
-      throw new UnsupportedOperationException(
-          String.format("%s should never be translated",
-          ProcessElements.class.getCanonicalName()));
-    }
   }
 
   // the TransformEvaluatorFactories can construct instances of all generic types of transform,

http://git-wip-us.apache.org/repos/asf/beam/blob/10c63e15/runners/flink/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/pom.xml b/runners/flink/pom.xml
index e77dbc8..7840c32 100644
--- a/runners/flink/pom.xml
+++ b/runners/flink/pom.xml
@@ -219,11 +219,6 @@
     <!-- Beam -->
     <dependency>
       <groupId>org.apache.beam</groupId>
-      <artifactId>beam-model-pipeline</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.beam</groupId>
       <artifactId>beam-sdks-java-core</artifactId>
       <exclusions>
         <exclusion>

http://git-wip-us.apache.org/repos/asf/beam/blob/10c63e15/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
index d1e2d57..cec01f8 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
@@ -31,12 +31,10 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
-import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems;
 import org.apache.beam.runners.core.SystemReduceFn;
 import org.apache.beam.runners.core.construction.PTransformTranslation;
-import org.apache.beam.runners.core.construction.SdkComponents;
 import org.apache.beam.runners.core.construction.SplittableParDo;
 import org.apache.beam.runners.core.construction.TransformPayloadTranslatorRegistrar;
 import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows;
@@ -56,7 +54,6 @@ import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
@@ -1085,7 +1082,7 @@ class FlinkStreamingTransformTranslators {
    * once SDF is reorganized appropriately.
    */
   private static class SplittableParDoProcessElementsTranslator
-      implements PTransformTranslation.TransformPayloadTranslator<
+      extends PTransformTranslation.TransformPayloadTranslator.NotSerializable<
       SplittableParDoViaKeyedWorkItems.ProcessElements<?, ?, ?, ?>> {
 
     private SplittableParDoProcessElementsTranslator() {}
@@ -1094,17 +1091,6 @@ class FlinkStreamingTransformTranslators {
     public String getUrn(SplittableParDoViaKeyedWorkItems.ProcessElements<?, ?, ?, ?> transform) {
       return SPLITTABLE_PROCESS_URN;
     }
-
-    @Override
-    public RunnerApi.FunctionSpec translate(
-        AppliedPTransform<?, ?, SplittableParDoViaKeyedWorkItems.ProcessElements<?, ?, ?, ?>>
-            transform,
-        SdkComponents components) {
-      throw new UnsupportedOperationException(
-          String.format(
-              "%s should never be translated",
-              SplittableParDoViaKeyedWorkItems.ProcessElements.class.getCanonicalName()));
-    }
   }
 
   /** Registers classes specialized to the Flink runner. */
@@ -1128,6 +1114,11 @@ class FlinkStreamingTransformTranslators {
               new SplittableParDoGbkIntoKeyedWorkItemsPayloadTranslator())
           .build();
     }
+
+    @Override
+    public Map<String, PTransformTranslation.TransformPayloadTranslator> getTransformRehydrators() {
+      return Collections.emptyMap();
+    }
   }
 
   /**
@@ -1135,7 +1126,7 @@ class FlinkStreamingTransformTranslators {
    * once SDF is reorganized appropriately.
    */
   private static class SplittableParDoProcessElementsPayloadTranslator
-      implements PTransformTranslation.TransformPayloadTranslator<
+      extends PTransformTranslation.TransformPayloadTranslator.NotSerializable<
       SplittableParDoViaKeyedWorkItems.ProcessElements<?, ?, ?, ?>> {
 
     private SplittableParDoProcessElementsPayloadTranslator() {}
@@ -1144,17 +1135,6 @@ class FlinkStreamingTransformTranslators {
     public String getUrn(SplittableParDoViaKeyedWorkItems.ProcessElements<?, ?, ?, ?> transform) {
       return SplittableParDo.SPLITTABLE_PROCESS_URN;
     }
-
-    @Override
-    public RunnerApi.FunctionSpec translate(
-        AppliedPTransform<?, ?, SplittableParDoViaKeyedWorkItems.ProcessElements<?, ?, ?, ?>>
-            transform,
-        SdkComponents components) {
-      throw new UnsupportedOperationException(
-          String.format(
-              "%s should never be translated",
-              SplittableParDoViaKeyedWorkItems.ProcessElements.class.getCanonicalName()));
-    }
   }
 
   /**
@@ -1162,7 +1142,7 @@ class FlinkStreamingTransformTranslators {
    * once SDF is reorganized appropriately.
    */
   private static class SplittableParDoGbkIntoKeyedWorkItemsPayloadTranslator
-      implements PTransformTranslation.TransformPayloadTranslator<
+      extends PTransformTranslation.TransformPayloadTranslator.NotSerializable<
       SplittableParDoViaKeyedWorkItems.GBKIntoKeyedWorkItems<?, ?>> {
 
     private SplittableParDoGbkIntoKeyedWorkItemsPayloadTranslator() {}
@@ -1171,24 +1151,13 @@ class FlinkStreamingTransformTranslators {
     public String getUrn(SplittableParDoViaKeyedWorkItems.GBKIntoKeyedWorkItems<?, ?> transform) {
       return SplittableParDo.SPLITTABLE_GBKIKWI_URN;
     }
-
-    @Override
-    public RunnerApi.FunctionSpec translate(
-        AppliedPTransform<?, ?, SplittableParDoViaKeyedWorkItems.GBKIntoKeyedWorkItems<?, ?>>
-            transform,
-        SdkComponents components) {
-      throw new UnsupportedOperationException(
-          String.format(
-              "%s should never be translated",
-              SplittableParDoViaKeyedWorkItems.GBKIntoKeyedWorkItems.class.getCanonicalName()));
-    }
   }
 
   /**
    * A translator just to vend the URN.
    */
   private static class CreateStreamingFlinkViewPayloadTranslator
-      implements PTransformTranslation.TransformPayloadTranslator<
+      extends PTransformTranslation.TransformPayloadTranslator.NotSerializable<
           CreateStreamingFlinkView.CreateFlinkPCollectionView<?, ?>> {
 
     private CreateStreamingFlinkViewPayloadTranslator() {}
@@ -1197,16 +1166,5 @@ class FlinkStreamingTransformTranslators {
     public String getUrn(CreateStreamingFlinkView.CreateFlinkPCollectionView<?, ?> transform) {
       return CreateStreamingFlinkView.CREATE_STREAMING_FLINK_VIEW_URN;
     }
-
-    @Override
-    public RunnerApi.FunctionSpec translate(
-        AppliedPTransform<?, ?, CreateStreamingFlinkView.CreateFlinkPCollectionView<?, ?>>
-            transform,
-        SdkComponents components) {
-      throw new UnsupportedOperationException(
-          String.format(
-              "%s should never be translated",
-              CreateStreamingFlinkView.CreateFlinkPCollectionView.class.getCanonicalName()));
-    }
   }
 }


[06/14] beam git commit: Add custom rehydration for TestStream

Posted by ke...@apache.org.
Add custom rehydration for TestStream


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6abf6f52
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6abf6f52
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6abf6f52

Branch: refs/heads/master
Commit: 6abf6f520df9efd5950063019bbc33ddc85a5c97
Parents: 10c63e1
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Oct 2 20:20:53 2017 -0700
Committer: Kenneth Knowles <ke...@apache.org>
Committed: Tue Oct 17 12:45:11 2017 -0700

----------------------------------------------------------------------
 .../construction/TestStreamTranslation.java     | 171 +++++++++++++++----
 .../construction/TestStreamTranslationTest.java |   4 +-
 2 files changed, 142 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/6abf6f52/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java
index 8e4c1db..1b18844 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java
@@ -22,12 +22,14 @@ import static com.google.common.base.Preconditions.checkArgument;
 import static org.apache.beam.runners.core.construction.PTransformTranslation.TEST_STREAM_TRANSFORM_URN;
 
 import com.google.auto.service.AutoService;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.ByteString;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import javax.annotation.Nonnull;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
 import org.apache.beam.sdk.coders.Coder;
@@ -47,21 +49,74 @@ import org.joda.time.Instant;
  */
 public class TestStreamTranslation {
 
-  static <T> RunnerApi.TestStreamPayload testStreamToPayload(
-      TestStream<T> transform, SdkComponents components) throws IOException {
-    String coderId = components.registerCoder(transform.getValueCoder());
+  private interface TestStreamLike {
+    Coder<?> getValueCoder();
 
-    RunnerApi.TestStreamPayload.Builder builder =
-        RunnerApi.TestStreamPayload.newBuilder().setCoderId(coderId);
+    List<RunnerApi.TestStreamPayload.Event> getEvents();
+  }
+
+  @VisibleForTesting
+  static class RawTestStream<T> extends PTransformTranslation.RawPTransform<PBegin, PCollection<T>>
+      implements TestStreamLike {
+
+    private final transient RehydratedComponents rehydratedComponents;
+    private final RunnerApi.TestStreamPayload payload;
+    private final Coder<T> valueCoder;
+    private final RunnerApi.FunctionSpec spec;
+
+    public RawTestStream(
+        RunnerApi.TestStreamPayload payload, RehydratedComponents rehydratedComponents) {
+      this.payload = payload;
+      this.spec =
+          RunnerApi.FunctionSpec.newBuilder()
+              .setUrn(TEST_STREAM_TRANSFORM_URN)
+              .setPayload(payload.toByteString())
+              .build();
+      this.rehydratedComponents = rehydratedComponents;
+
+      // Eagerly extract the coder to throw a good exception here
+      try {
+        this.valueCoder = (Coder<T>) rehydratedComponents.getCoder(payload.getCoderId());
+      } catch (IOException exc) {
+        throw new IllegalArgumentException(
+            String.format(
+                "Failure extracting coder with id '%s' for %s",
+                payload.getCoderId(), TestStream.class.getSimpleName()),
+            exc);
+      }
+    }
+
+    @Override
+    public String getUrn() {
+      return TEST_STREAM_TRANSFORM_URN;
+    }
+
+    @Nonnull
+    @Override
+    public RunnerApi.FunctionSpec getSpec() {
+      return spec;
+    }
 
-    for (TestStream.Event<T> event : transform.getEvents()) {
-      builder.addEvents(toProto(event, transform.getValueCoder()));
+    @Override
+    public RunnerApi.FunctionSpec migrate(SdkComponents components) throws IOException {
+      return RunnerApi.FunctionSpec.newBuilder()
+          .setUrn(TEST_STREAM_TRANSFORM_URN)
+          .setPayload(payloadForTestStreamLike(this, components).toByteString())
+          .build();
     }
 
-    return builder.build();
+    @Override
+    public Coder<T> getValueCoder() {
+      return valueCoder;
+    }
+
+    @Override
+    public List<RunnerApi.TestStreamPayload.Event> getEvents() {
+      return payload.getEventsList();
+    }
   }
 
-  private static TestStream<?> fromProto(
+  private static TestStream<?> testStreamFromProtoPayload(
       RunnerApi.TestStreamPayload testStreamPayload, RehydratedComponents components)
       throws IOException {
 
@@ -70,7 +125,7 @@ public class TestStreamTranslation {
     List<TestStream.Event<Object>> events = new ArrayList<>();
 
     for (RunnerApi.TestStreamPayload.Event event : testStreamPayload.getEventsList()) {
-      events.add(fromProto(event, coder));
+      events.add(eventFromProto(event, coder));
     }
     return TestStream.fromRawEvents(coder, events);
   }
@@ -98,12 +153,12 @@ public class TestStreamTranslation {
         RunnerApi.TestStreamPayload.parseFrom(transformProto.getSpec().getPayload());
 
     return (TestStream<T>)
-        fromProto(
+        testStreamFromProtoPayload(
             testStreamPayload, RehydratedComponents.forComponents(sdkComponents.toComponents()));
   }
 
-  static <T> RunnerApi.TestStreamPayload.Event toProto(TestStream.Event<T> event, Coder<T> coder)
-      throws IOException {
+  static <T> RunnerApi.TestStreamPayload.Event eventToProto(
+      TestStream.Event<T> event, Coder<T> coder) throws IOException {
     switch (event.getType()) {
       case WATERMARK:
         return RunnerApi.TestStreamPayload.Event.newBuilder()
@@ -143,7 +198,7 @@ public class TestStreamTranslation {
     }
   }
 
-  static <T> TestStream.Event<T> fromProto(
+  static <T> TestStream.Event<T> eventFromProto(
       RunnerApi.TestStreamPayload.Event protoEvent, Coder<T> coder) throws IOException {
     switch (protoEvent.getEventCase()) {
       case WATERMARK_EVENT:
@@ -172,8 +227,8 @@ public class TestStreamTranslation {
     }
   }
 
-  static class TestStreamTranslator
-      extends TransformPayloadTranslator.WithDefaultRehydration<TestStream<?>> {
+  /** A translator registered to translate {@link TestStream} objects to protobuf representation. */
+  static class TestStreamTranslator implements TransformPayloadTranslator<TestStream<?>> {
     @Override
     public String getUrn(TestStream<?> transform) {
       return TEST_STREAM_TRANSFORM_URN;
@@ -181,27 +236,81 @@ public class TestStreamTranslation {
 
     @Override
     public RunnerApi.FunctionSpec translate(
-        AppliedPTransform<?, ?, TestStream<?>> transform, SdkComponents components)
+        final AppliedPTransform<?, ?, TestStream<?>> transform, SdkComponents components)
         throws IOException {
-      return RunnerApi.FunctionSpec.newBuilder()
-          .setUrn(getUrn(transform.getTransform()))
-          .setPayload(testStreamToPayload(transform.getTransform(), components).toByteString())
-          .build();
+      return translateTyped(transform.getTransform(), components);
     }
-  }
 
-  /** Registers {@link TestStreamTranslator}. */
-  @AutoService(TransformPayloadTranslatorRegistrar.class)
-  public static class Registrar implements TransformPayloadTranslatorRegistrar {
     @Override
-    public Map<? extends Class<? extends PTransform>, ? extends TransformPayloadTranslator>
-        getTransformPayloadTranslators() {
-      return Collections.singletonMap(TestStream.class, new TestStreamTranslator());
+    public PTransformTranslation.RawPTransform<?, ?> rehydrate(
+        RunnerApi.PTransform protoTransform, RehydratedComponents rehydratedComponents)
+        throws IOException {
+      checkArgument(
+          protoTransform.getSpec() != null,
+          "%s received transform with null spec",
+          getClass().getSimpleName());
+      checkArgument(protoTransform.getSpec().getUrn().equals(TEST_STREAM_TRANSFORM_URN));
+      return new RawTestStream<>(
+          RunnerApi.TestStreamPayload.parseFrom(protoTransform.getSpec().getPayload()),
+          rehydratedComponents);
     }
 
-    @Override
-    public Map<String, TransformPayloadTranslator> getTransformRehydrators() {
-      return Collections.emptyMap();
+    private <T> RunnerApi.FunctionSpec translateTyped(
+        final TestStream<T> testStream, SdkComponents components) throws IOException {
+      return RunnerApi.FunctionSpec.newBuilder()
+          .setUrn(TEST_STREAM_TRANSFORM_URN)
+          .setPayload(payloadForTestStream(testStream, components).toByteString())
+          .build();
     }
+
+    /** Registers {@link TestStreamTranslator}. */
+    @AutoService(TransformPayloadTranslatorRegistrar.class)
+    public static class Registrar implements TransformPayloadTranslatorRegistrar {
+      @Override
+      public Map<? extends Class<? extends PTransform>, ? extends TransformPayloadTranslator>
+          getTransformPayloadTranslators() {
+        return Collections.singletonMap(TestStream.class, new TestStreamTranslator());
+      }
+
+      @Override
+      public Map<String, ? extends TransformPayloadTranslator> getTransformRehydrators() {
+        return Collections.singletonMap(TEST_STREAM_TRANSFORM_URN, new TestStreamTranslator());
+      }
+    }
+  }
+
+  /** Produces a {@link RunnerApi.TestStreamPayload} from a portable {@link RawTestStream}. */
+  static RunnerApi.TestStreamPayload payloadForTestStreamLike(
+      TestStreamLike transform, SdkComponents components) throws IOException {
+    return RunnerApi.TestStreamPayload.newBuilder()
+        .setCoderId(components.registerCoder(transform.getValueCoder()))
+        .addAllEvents(transform.getEvents())
+        .build();
+  }
+
+  @VisibleForTesting
+  static <T> RunnerApi.TestStreamPayload payloadForTestStream(
+      final TestStream<T> testStream, SdkComponents components) throws IOException {
+    return payloadForTestStreamLike(
+        new TestStreamLike() {
+          @Override
+          public Coder<T> getValueCoder() {
+            return testStream.getValueCoder();
+          }
+
+          @Override
+          public List<RunnerApi.TestStreamPayload.Event> getEvents() {
+            try {
+              List<RunnerApi.TestStreamPayload.Event> protoEvents = new ArrayList<>();
+              for (TestStream.Event<T> event : testStream.getEvents()) {
+                protoEvents.add(eventToProto(event, testStream.getValueCoder()));
+              }
+              return protoEvents;
+            } catch (IOException e) {
+              throw new RuntimeException(e);
+            }
+          }
+        },
+        components);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/6abf6f52/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TestStreamTranslationTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TestStreamTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TestStreamTranslationTest.java
index 3678fc7..fc30552 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TestStreamTranslationTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TestStreamTranslationTest.java
@@ -81,7 +81,7 @@ public class TestStreamTranslationTest {
     public void testEncodedProto() throws Exception {
       SdkComponents components = SdkComponents.create();
       RunnerApi.TestStreamPayload payload =
-          TestStreamTranslation.testStreamToPayload(testStream, components);
+          TestStreamTranslation.payloadForTestStream(testStream, components);
 
       verifyTestStreamEncoding(
           testStream, payload, RehydratedComponents.forComponents(components.toComponents()));
@@ -122,7 +122,7 @@ public class TestStreamTranslationTest {
 
       for (int i = 0; i < payload.getEventsList().size(); ++i) {
         assertThat(
-            TestStreamTranslation.fromProto(payload.getEvents(i), testStream.getValueCoder()),
+            TestStreamTranslation.eventFromProto(payload.getEvents(i), testStream.getValueCoder()),
             equalTo(testStream.getEvents().get(i)));
       }
     }


[10/14] beam git commit: Add NotSerializable.forUrn to key by URN for non-serializable overrides

Posted by ke...@apache.org.
Add NotSerializable.forUrn to key by URN for non-serializable overrides


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d684ca09
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d684ca09
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d684ca09

Branch: refs/heads/master
Commit: d684ca096b524202b0d64e1b4ab1e472528bb3a5
Parents: 5bc77fc
Author: Kenneth Knowles <ke...@apache.org>
Authored: Tue Oct 17 12:41:42 2017 -0700
Committer: Kenneth Knowles <ke...@apache.org>
Committed: Tue Oct 17 12:45:11 2017 -0700

----------------------------------------------------------------------
 .../runners/core/construction/PTransformTranslation.java  | 10 ++++++++++
 1 file changed, 10 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/d684ca09/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
index 785b9e4..8e6829b 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
@@ -297,6 +297,16 @@ public class PTransformTranslation {
      */
     abstract class NotSerializable<T extends PTransform<?, ?>>
         implements TransformPayloadTranslator<T> {
+
+      public static NotSerializable<?> forUrn(final String urn) {
+        return new NotSerializable<PTransform<?, ?>>() {
+          @Override
+          public String getUrn(PTransform<?, ?> transform) {
+            return urn;
+          }
+        };
+      }
+
       @Override
       public final FunctionSpec translate(
           AppliedPTransform<?, ?, T> transform, SdkComponents components) throws IOException {


[11/14] beam git commit: Support side inputs in CombineTranslation

Posted by ke...@apache.org.
Support side inputs in CombineTranslation


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5bc77fcf
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5bc77fcf
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5bc77fcf

Branch: refs/heads/master
Commit: 5bc77fcf619dc6f1272d1cd4143b6a09e0cfbda1
Parents: 11368e0
Author: Kenneth Knowles <ke...@apache.org>
Authored: Tue Oct 17 11:50:46 2017 -0700
Committer: Kenneth Knowles <ke...@apache.org>
Committed: Tue Oct 17 12:45:11 2017 -0700

----------------------------------------------------------------------
 .../runners/core/construction/CombineTranslation.java     | 10 +++++++---
 1 file changed, 7 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/5bc77fcf/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java
index 21796aa..ff431fc 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java
@@ -23,7 +23,6 @@ import static org.apache.beam.runners.core.construction.PTransformTranslation.CO
 
 import com.google.auto.service.AutoService;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import com.google.protobuf.ByteString;
 import java.io.IOException;
@@ -49,6 +48,7 @@ import org.apache.beam.sdk.util.AppliedCombineFn;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
 
 /**
  * Methods for translating between {@link Combine.PerKey} {@link PTransform PTransforms} and {@link
@@ -170,8 +170,12 @@ public class CombineTranslation {
 
           @Override
           public Map<String, SideInput> getSideInputs() {
-            // TODO: support side inputs
-            return ImmutableMap.of();
+            Map<String, SideInput> sideInputs = new HashMap<>();
+            for (PCollectionView<?> sideInput : combine.getTransform().getSideInputs()) {
+              sideInputs.put(
+                  sideInput.getTagInternal().getId(), ParDoTranslation.toProto(sideInput));
+            }
+            return sideInputs;
           }
         },
         components);


[04/14] beam git commit: Add RawPTransform.migrate(SdkComponents) for re-serialization

Posted by ke...@apache.org.
Add RawPTransform.migrate(SdkComponents) for re-serialization


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c14455ef
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c14455ef
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c14455ef

Branch: refs/heads/master
Commit: c14455ef4209a14a62f7b18f604c9673b32d06d7
Parents: 020ef14
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Oct 2 20:28:16 2017 -0700
Committer: Kenneth Knowles <ke...@apache.org>
Committed: Tue Oct 17 12:45:11 2017 -0700

----------------------------------------------------------------------
 .../construction/PTransformTranslation.java     | 98 +++++++++-----------
 .../core/construction/PipelineTranslation.java  | 36 ++++---
 .../core/construction/SplittableParDo.java      |  8 ++
 .../core/SplittableParDoViaKeyedWorkItems.java  |  7 ++
 .../beam/runners/direct/DirectGroupByKey.java   | 14 +++
 .../beam/runners/direct/MultiStepCombine.java   | 18 +++-
 .../direct/ParDoMultiOverrideFactory.java       |  7 ++
 .../direct/TestStreamEvaluatorFactory.java      |  7 ++
 .../runners/direct/ViewOverrideFactory.java     |  8 ++
 9 files changed, 129 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/c14455ef/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
index f9e7837..31767a0 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
@@ -23,7 +23,6 @@ import static com.google.common.base.Preconditions.checkArgument;
 import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Sets;
-import com.google.protobuf.ByteString;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
@@ -66,8 +65,8 @@ public class PTransformTranslation {
   public static final String WRITE_FILES_TRANSFORM_URN = "urn:beam:transform:write_files:0.1";
 
   /**
-   * @deprecated runners should move away from translating `CreatePCollectionView` and treat this
-   * as part of the translation for a `ParDo` side input.
+   * @deprecated runners should move away from translating `CreatePCollectionView` and treat this as
+   *     part of the translation for a `ParDo` side input.
    */
   @Deprecated
   public static final String CREATE_VIEW_TRANSFORM_URN = "urn:beam:transform:create_view:v1";
@@ -85,8 +84,8 @@ public class PTransformTranslation {
       Map<Class<? extends PTransform>, TransformPayloadTranslator> newTranslators =
           (Map) registrar.getTransformPayloadTranslators();
 
-      Set<Class<? extends PTransform>> alreadyRegistered = Sets.intersection(
-          translators.keySet(), newTranslators.keySet());
+      Set<Class<? extends PTransform>> alreadyRegistered =
+          Sets.intersection(translators.keySet(), newTranslators.keySet());
 
       if (!alreadyRegistered.isEmpty()) {
         throw new IllegalArgumentException(
@@ -143,20 +142,13 @@ public class PTransformTranslation {
         DisplayDataTranslation.toProto(DisplayData.from(appliedPTransform.getTransform())));
 
     PTransform<?, ?> transform = appliedPTransform.getTransform();
+
     // A RawPTransform directly vends its payload. Because it will generally be
     // a subclass, we cannot do dictionary lookup in KNOWN_PAYLOAD_TRANSLATORS.
     if (transform instanceof RawPTransform) {
-      RawPTransform<?, ?> rawPTransform = (RawPTransform<?, ?>) transform;
-
-      if (rawPTransform.getUrn() != null) {
-        FunctionSpec.Builder payload = FunctionSpec.newBuilder().setUrn(rawPTransform.getUrn());
-        @Nullable ByteString parameter = rawPTransform.getPayload();
-        if (parameter != null) {
-          payload.setPayload(parameter);
-        }
-        transformBuilder.setSpec(payload);
-      }
-      rawPTransform.registerComponents(components);
+      // The raw transform was parsed in the context of other components; this puts it in the
+      // context of our current serialization
+      transformBuilder.setSpec(((RawPTransform<?, ?>) transform).migrate(components));
     } else if (KNOWN_PAYLOAD_TRANSLATORS.containsKey(transform.getClass())) {
       FunctionSpec payload =
           KNOWN_PAYLOAD_TRANSLATORS
@@ -186,9 +178,7 @@ public class PTransformTranslation {
     return tag.getId();
   }
 
-  /**
-   * Returns the URN for the transform if it is known, otherwise {@code null}.
-   */
+  /** Returns the URN for the transform if it is known, otherwise {@code null}. */
   @Nullable
   public static String urnForTransformOrNull(PTransform<?, ?> transform) {
 
@@ -205,9 +195,7 @@ public class PTransformTranslation {
     return translator.getUrn(transform);
   }
 
-  /**
-   * Returns the URN for the transform if it is known, otherwise throws.
-   */
+  /** Returns the URN for the transform if it is known, otherwise throws. */
   public static String urnForTransform(PTransform<?, ?> transform) {
     String urn = urnForTransformOrNull(transform);
     if (urn == null) {
@@ -235,24 +223,48 @@ public class PTransformTranslation {
    * #expand} method since the definition of the transform may be lost. The transform is already
    * fully expanded in the pipeline proto.
    */
-  public abstract static class RawPTransform<
-          InputT extends PInput, OutputT extends POutput>
+  public abstract static class RawPTransform<InputT extends PInput, OutputT extends POutput>
       extends PTransform<InputT, OutputT> {
 
+    /** The URN for this transform, if standardized. */
     @Nullable
-    public abstract String getUrn();
+    public String getUrn() {
+      return getSpec() == null ? null : getSpec().getUrn();
+    }
 
+    /** The payload for this transform, if any. */
     @Nullable
-    public ByteString getPayload() {
-      return null;
+    public abstract FunctionSpec getSpec();
+
+    /**
+     * Build a new payload set in the context of the given {@link SdkComponents}, if applicable.
+     *
+     * <p>When re-serializing this transform, the ids reference in the rehydrated payload may
+     * conflict with those defined by the serialization context. In that case, the components must
+     * be re-registered and a new payload returned.
+     */
+    public FunctionSpec migrate(SdkComponents components) throws IOException {
+      return getSpec();
     }
 
-    public void registerComponents(SdkComponents components) {}
+    /**
+     * By default, throws an exception, but can be overridden.
+     *
+     * <p>It is permissible for runner-specific transforms to be both a {@link RawPTransform} that
+     * directly vends its proto representation and also to expand, for convenience of not having to
+     * register a translator.
+     */
+    @Override
+    public OutputT expand(InputT input) {
+      throw new IllegalStateException(
+          String.format(
+              "%s should never be asked to expand;"
+                  + " it is the result of deserializing an already-constructed Pipeline",
+              getClass().getSimpleName()));
+    }
   }
 
-  /**
-   * A translator that uses the explicit URN and payload from a {@link RawPTransform}.
-   */
+  /** A translator that uses the explicit URN and payload from a {@link RawPTransform}. */
   public static class RawPTransformTranslator
       implements TransformPayloadTranslator<RawPTransform<?, ?>> {
     @Override
@@ -262,27 +274,9 @@ public class PTransformTranslation {
 
     @Override
     public FunctionSpec translate(
-        AppliedPTransform<?, ?, RawPTransform<?, ?>> transform,
-        SdkComponents components) {
-
-      // Anonymous composites have no spec
-      if (transform.getTransform().getUrn() == null) {
-        return null;
-      }
-
-      FunctionSpec.Builder transformSpec =
-          FunctionSpec.newBuilder().setUrn(getUrn(transform.getTransform()));
-
-      ByteString payload = transform.getTransform().getPayload();
-      if (payload != null) {
-        transformSpec.setPayload(payload);
-      }
-
-      // Transforms like Combine may have Coders that need to be added but do not
-      // occur in a black-box traversal
-      transform.getTransform().registerComponents(components);
-
-      return transformSpec.build();
+        AppliedPTransform<?, ?, RawPTransform<?, ?>> transform, SdkComponents components)
+        throws IOException {
+      return transform.getTransform().migrate(components);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c14455ef/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
index 0aca837..1624865 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
@@ -24,7 +24,6 @@ import com.google.auto.value.AutoValue;
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ListMultimap;
-import com.google.protobuf.ByteString;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -105,8 +104,7 @@ public class PipelineTranslation {
     return DisplayData.from(component);
   }
 
-  public static Pipeline fromProto(final RunnerApi.Pipeline pipelineProto)
-      throws IOException {
+  public static Pipeline fromProto(final RunnerApi.Pipeline pipelineProto) throws IOException {
     TransformHierarchy transforms = new TransformHierarchy();
     Pipeline pipeline = Pipeline.forTransformHierarchy(transforms, PipelineOptionsFactory.create());
 
@@ -184,11 +182,7 @@ public class PipelineTranslation {
     }
 
     RehydratedPTransform transform =
-        RehydratedPTransform.of(
-            transformSpec.getUrn(),
-            transformSpec.getPayload(),
-            additionalInputs,
-            additionalCoders);
+        RehydratedPTransform.of(transformSpec, additionalInputs, additionalCoders);
 
     if (isPrimitive(transformProto)) {
       transforms.addFinalizedPrimitiveNode(
@@ -234,32 +228,33 @@ public class PipelineTranslation {
   private static boolean isPrimitive(RunnerApi.PTransform transformProto) {
     return transformProto.getSubtransformsCount() == 0
         && !transformProto
-        .getInputsMap()
-        .values()
-        .containsAll(transformProto.getOutputsMap().values());
+            .getInputsMap()
+            .values()
+            .containsAll(transformProto.getOutputsMap().values());
   }
 
   @AutoValue
   abstract static class RehydratedPTransform extends RawPTransform<PInput, POutput> {
 
     @Nullable
-    public abstract String getUrn();
-
-    @Nullable
-    public abstract ByteString getPayload();
+    public abstract RunnerApi.FunctionSpec getSpec();
 
     @Override
     public abstract Map<TupleTag<?>, PValue> getAdditionalInputs();
 
     public abstract List<Coder<?>> getCoders();
 
+    @Override
+    public String getUrn() {
+      return getSpec().getUrn();
+    }
+
     public static RehydratedPTransform of(
-        String urn,
-        ByteString payload,
+        RunnerApi.FunctionSpec payload,
         Map<TupleTag<?>, PValue> additionalInputs,
         List<Coder<?>> additionalCoders) {
       return new AutoValue_PipelineTranslation_RehydratedPTransform(
-          urn, payload, additionalInputs, additionalCoders);
+          payload, additionalInputs, additionalCoders);
     }
 
     @Override
@@ -275,12 +270,12 @@ public class PipelineTranslation {
     public String toString() {
       return MoreObjects.toStringHelper(this)
           .add("urn", getUrn())
-          .add("payload", getPayload())
+          .add("payload", getSpec())
           .toString();
     }
 
     @Override
-    public void registerComponents(SdkComponents components) {
+    public RunnerApi.FunctionSpec migrate(SdkComponents components) {
       for (Coder<?> coder : getCoders()) {
         try {
           components.registerCoder(coder);
@@ -288,6 +283,7 @@ public class PipelineTranslation {
           throw new RuntimeException(e);
         }
       }
+      return getSpec();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c14455ef/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
index 32d3409..ab66e84 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
@@ -24,6 +24,8 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import javax.annotation.Nullable;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.construction.PTransformTranslation.RawPTransform;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.Coder;
@@ -295,6 +297,12 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
     public String getUrn() {
       return SPLITTABLE_PROCESS_KEYED_ELEMENTS_URN;
     }
+
+    @Nullable
+    @Override
+    public RunnerApi.FunctionSpec getSpec() {
+      return null;
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/c14455ef/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
index 251260e..400df19 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
@@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Iterables;
 import java.util.List;
 import java.util.Map;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.construction.PTransformReplacements;
 import org.apache.beam.runners.core.construction.PTransformTranslation.RawPTransform;
 import org.apache.beam.runners.core.construction.ReplacementOutputs;
@@ -87,6 +88,12 @@ public class SplittableParDoViaKeyedWorkItems {
     public String getUrn() {
       return SplittableParDo.SPLITTABLE_GBKIKWI_URN;
     }
+
+    @Override
+    public RunnerApi.FunctionSpec getSpec() {
+      throw new UnsupportedOperationException(
+          String.format("%s should never be serialized to proto", getClass().getSimpleName()));
+    }
   }
 
   /** Overrides a {@link ProcessKeyedElements} into {@link SplittableProcessViaKeyedWorkItems}. */

http://git-wip-us.apache.org/repos/asf/beam/blob/c14455ef/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
index 3ba04e7..9e56b65 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
@@ -20,6 +20,8 @@ package org.apache.beam.runners.direct;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
+import javax.annotation.Nullable;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.KeyedWorkItemCoder;
 import org.apache.beam.runners.core.construction.ForwardingPTransform;
@@ -92,6 +94,12 @@ class DirectGroupByKey<K, V>
     public String getUrn() {
       return DIRECT_GBKO_URN;
     }
+
+    @Nullable
+    @Override
+    public RunnerApi.FunctionSpec getSpec() {
+      return null;
+    }
   }
 
   static final class DirectGroupAlsoByWindow<K, V>
@@ -141,5 +149,11 @@ class DirectGroupByKey<K, V>
     public String getUrn() {
       return DIRECT_GABW_URN;
     }
+
+    @Nullable
+    @Override
+    public RunnerApi.FunctionSpec getSpec() {
+      return null;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c14455ef/runners/direct-java/src/main/java/org/apache/beam/runners/direct/MultiStepCombine.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/MultiStepCombine.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/MultiStepCombine.java
index ae21b4d..5253ef5 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/MultiStepCombine.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/MultiStepCombine.java
@@ -28,7 +28,9 @@ import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Objects;
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.construction.CombineTranslation;
 import org.apache.beam.runners.core.construction.PTransformTranslation;
 import org.apache.beam.runners.core.construction.PTransformTranslation.RawPTransform;
@@ -177,12 +179,18 @@ class MultiStepCombine<K, InputT, AccumT, OutputT>
     this.outputCoder = outputCoder;
   }
 
-  @Nullable
+  @Nonnull
   @Override
   public String getUrn() {
     return "urn:beam:directrunner:transforms:multistepcombine:v1";
   }
 
+  @Nullable
+  @Override
+  public RunnerApi.FunctionSpec getSpec() {
+    return null;
+  }
+
   @Override
   public PCollection<KV<K, OutputT>> expand(PCollection<KV<K, InputT>> input) {
     checkArgument(
@@ -337,11 +345,17 @@ class MultiStepCombine<K, InputT, AccumT, OutputT>
           input.getPipeline(), input.getWindowingStrategy(), input.isBounded(), outputCoder);
     }
 
-    @Nullable
+    @Nonnull
     @Override
     public String getUrn() {
       return DIRECT_MERGE_ACCUMULATORS_EXTRACT_OUTPUT_URN;
     }
+
+    @Nullable
+    @Override
+    public RunnerApi.FunctionSpec getSpec() {
+      return null;
+    }
   }
 
   static class MergeAndExtractAccumulatorOutputEvaluatorFactory

http://git-wip-us.apache.org/repos/asf/beam/blob/c14455ef/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
index 26f30b0..5ec52be 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.KeyedWorkItemCoder;
 import org.apache.beam.runners.core.KeyedWorkItems;
@@ -261,6 +262,12 @@ class ParDoMultiOverrideFactory<InputT, OutputT>
     public String getUrn() {
       return DIRECT_STATEFUL_PAR_DO_URN;
     }
+
+    @Override
+    public RunnerApi.FunctionSpec getSpec() {
+      throw new UnsupportedOperationException(
+          String.format("%s should never be serialized to proto", getClass().getSimpleName()));
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/c14455ef/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
index 49e7be7..d62b64c 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
@@ -29,6 +29,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.annotation.Nullable;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.construction.PTransformTranslation;
 import org.apache.beam.runners.core.construction.ReplacementOutputs;
 import org.apache.beam.runners.core.construction.TestStreamTranslation;
@@ -218,6 +219,12 @@ class TestStreamEvaluatorFactory implements TransformEvaluatorFactory {
       public String getUrn() {
         return DIRECT_TEST_STREAM_URN;
       }
+
+      @Nullable
+      @Override
+      public RunnerApi.FunctionSpec getSpec() {
+        return null;
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c14455ef/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
index c2255fe..61b7978 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
@@ -20,6 +20,8 @@ package org.apache.beam.runners.direct;
 
 import java.io.IOException;
 import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.construction.CreatePCollectionViewTranslation;
 import org.apache.beam.runners.core.construction.PTransformReplacements;
 import org.apache.beam.runners.core.construction.PTransformTranslation.RawPTransform;
@@ -128,6 +130,12 @@ class ViewOverrideFactory<ElemT, ViewT>
     public String getUrn() {
       return DIRECT_WRITE_VIEW_URN;
     }
+
+    @Nullable
+    @Override
+    public RunnerApi.FunctionSpec getSpec() {
+      return null;
+    }
   }
 
   public static final String DIRECT_WRITE_VIEW_URN =