You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/08/09 01:19:17 UTC

[1/2] beam git commit: Use bytes instead of Any in RunnerApi.FunctionSpec

Repository: beam
Updated Branches:
  refs/heads/master f5714f220 -> 9ed2cf41f


Use bytes instead of Any in RunnerApi.FunctionSpec

Keep a "any" field, renamed to any_param.

Rename `parameter` to `payload`


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2b9b0504
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2b9b0504
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2b9b0504

Branch: refs/heads/master
Commit: 2b9b05049822a22154ac3c2f6b593061f42b54c1
Parents: f5714f2
Author: Thomas Groh <tg...@google.com>
Authored: Mon Jun 5 11:22:56 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Aug 8 18:18:59 2017 -0700

----------------------------------------------------------------------
 .../core/construction/CoderTranslation.java     | 16 +---
 .../core/construction/CombineTranslation.java   | 28 ++----
 .../CreatePCollectionViewTranslation.java       | 17 +---
 .../construction/PTransformTranslation.java     | 12 +--
 .../core/construction/ParDoTranslation.java     | 58 +++++-------
 .../core/construction/PipelineTranslation.java  | 12 +--
 .../core/construction/ReadTranslation.java      | 56 +++++------
 .../construction/TestStreamTranslation.java     |  5 +-
 .../construction/WindowIntoTranslation.java     |  9 +-
 .../WindowingStrategyTranslation.java           | 97 ++++++++------------
 .../construction/WriteFilesTranslation.java     | 29 +++---
 .../CreatePCollectionViewTranslationTest.java   |  6 +-
 .../core/construction/ParDoTranslationTest.java |  3 +-
 .../construction/TestStreamTranslationTest.java |  4 +-
 .../src/main/proto/beam_runner_api.proto        |  5 +-
 .../beam/fn/harness/BeamFnDataReadRunner.java   |  4 +-
 .../beam/fn/harness/BeamFnDataWriteRunner.java  |  4 +-
 .../beam/fn/harness/BoundedSourceRunner.java    | 10 +-
 .../apache/beam/fn/harness/FnApiDoFnRunner.java | 10 +-
 .../fn/harness/BeamFnDataReadRunnerTest.java    |  5 +-
 .../fn/harness/BeamFnDataWriteRunnerTest.java   |  5 +-
 .../fn/harness/BoundedSourceRunnerTest.java     | 18 ++--
 .../beam/fn/harness/FnApiDoFnRunnerTest.java    | 13 +--
 sdks/python/apache_beam/coders/coders.py        | 11 +--
 .../runners/portability/fn_api_runner.py        | 78 ++++++++++------
 .../runners/worker/bundle_processor.py          | 21 ++---
 sdks/python/apache_beam/transforms/core.py      | 10 +-
 .../python/apache_beam/transforms/ptransform.py |  6 +-
 sdks/python/apache_beam/utils/proto_utils.py    | 11 +++
 sdks/python/apache_beam/utils/urns.py           |  6 +-
 30 files changed, 250 insertions(+), 319 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslation.java
index a6719ff..2246f81 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslation.java
@@ -24,9 +24,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.BiMap;
 import com.google.common.collect.ImmutableBiMap;
 import com.google.common.collect.ImmutableMap;
-import com.google.protobuf.Any;
 import com.google.protobuf.ByteString;
-import com.google.protobuf.BytesValue;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.LinkedList;
@@ -138,13 +136,9 @@ public class CoderTranslation {
                 .setSpec(
                     FunctionSpec.newBuilder()
                         .setUrn(JAVA_SERIALIZED_CODER_URN)
-                        .setParameter(
-                            Any.pack(
-                                BytesValue.newBuilder()
-                                    .setValue(
-                                        ByteString.copyFrom(
-                                            SerializableUtils.serializeToByteArray(coder)))
-                                    .build()))))
+                        .setPayload(
+                            ByteString.copyFrom(SerializableUtils.serializeToByteArray(coder)))
+                        .build()))
         .build();
   }
 
@@ -182,9 +176,7 @@ public class CoderTranslation {
             protoCoder
                 .getSpec()
                 .getSpec()
-                .getParameter()
-                .unpack(BytesValue.class)
-                .getValue()
+                .getPayload()
                 .toByteArray(),
             "Custom Coder Bytes");
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java
index d909ccf..17c48dc 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java
@@ -23,9 +23,7 @@ import static org.apache.beam.runners.core.construction.PTransformTranslation.CO
 
 import com.google.auto.service.AutoService;
 import com.google.common.collect.Iterables;
-import com.google.protobuf.Any;
 import com.google.protobuf.ByteString;
-import com.google.protobuf.BytesValue;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
@@ -79,7 +77,7 @@ public class CombineTranslation {
       CombinePayload payload = toProto(transform, components);
       return RunnerApi.FunctionSpec.newBuilder()
           .setUrn(COMBINE_TRANSFORM_URN)
-          .setParameter(Any.pack(payload))
+          .setPayload(payload.toByteString())
           .build();
     }
 
@@ -138,13 +136,9 @@ public class CombineTranslation {
         .setSpec(
             FunctionSpec.newBuilder()
                 .setUrn(JAVA_SERIALIZED_COMBINE_FN_URN)
-                .setParameter(
-                    Any.pack(
-                        BytesValue.newBuilder()
-                            .setValue(
-                                ByteString.copyFrom(
-                                    SerializableUtils.serializeToByteArray(combineFn)))
-                            .build())))
+                .setPayload(
+                    ByteString.copyFrom(SerializableUtils.serializeToByteArray(combineFn)))
+                .build())
         .build();
   }
 
@@ -171,9 +165,7 @@ public class CombineTranslation {
             payload
                 .getCombineFn()
                 .getSpec()
-                .getParameter()
-                .unpack(BytesValue.class)
-                .getValue()
+                .getPayload()
                 .toByteArray(),
             "CombineFn");
   }
@@ -190,10 +182,10 @@ public class CombineTranslation {
 
   private static CombinePayload getCombinePayload(
       AppliedPTransform<?, ?, ?> transform, SdkComponents components) throws IOException {
-    return PTransformTranslation.toProto(
-            transform, Collections.<AppliedPTransform<?, ?, ?>>emptyList(), components)
-        .getSpec()
-        .getParameter()
-        .unpack(CombinePayload.class);
+    return CombinePayload.parseFrom(
+        PTransformTranslation.toProto(
+                transform, Collections.<AppliedPTransform<?, ?, ?>>emptyList(), components)
+            .getSpec()
+            .getPayload());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java
index c67d688..1027ea2 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java
@@ -21,9 +21,7 @@ package org.apache.beam.runners.core.construction;
 import static com.google.common.base.Preconditions.checkArgument;
 
 import com.google.auto.service.AutoService;
-import com.google.protobuf.Any;
 import com.google.protobuf.ByteString;
-import com.google.protobuf.BytesValue;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.Map;
@@ -79,9 +77,7 @@ public class CreatePCollectionViewTranslation {
         SerializableUtils.deserializeFromByteArray(
             transformProto
                 .getSpec()
-                .getParameter()
-                .unpack(BytesValue.class)
-                .getValue()
+                .getPayload()
                 .toByteArray(),
             PCollectionView.class.getSimpleName());
   }
@@ -104,14 +100,9 @@ public class CreatePCollectionViewTranslation {
         SdkComponents components) {
       return FunctionSpec.newBuilder()
           .setUrn(getUrn(transform.getTransform()))
-          .setParameter(
-              Any.pack(
-                  BytesValue.newBuilder()
-                      .setValue(
-                          ByteString.copyFrom(
-                              SerializableUtils.serializeToByteArray(
-                                  transform.getTransform().getView())))
-                      .build()))
+          .setPayload(
+              ByteString.copyFrom(
+                  SerializableUtils.serializeToByteArray(transform.getTransform().getView())))
           .build();
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
index b8365c9..4bfe17a 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
@@ -21,7 +21,7 @@ package org.apache.beam.runners.core.construction;
 import static com.google.common.base.Preconditions.checkArgument;
 
 import com.google.common.collect.ImmutableMap;
-import com.google.protobuf.Any;
+import com.google.protobuf.ByteString;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
@@ -131,9 +131,9 @@ public class PTransformTranslation {
 
       if (rawPTransform.getUrn() != null) {
         FunctionSpec.Builder payload = FunctionSpec.newBuilder().setUrn(rawPTransform.getUrn());
-        @Nullable Any parameter = rawPTransform.getPayload();
+        @Nullable ByteString parameter = rawPTransform.getPayload();
         if (parameter != null) {
-          payload.setParameter(parameter);
+          payload.setPayload(parameter);
         }
         transformBuilder.setSpec(payload);
       }
@@ -224,7 +224,7 @@ public class PTransformTranslation {
     public abstract String getUrn();
 
     @Nullable
-    public Any getPayload() {
+    public ByteString getPayload() {
       return null;
     }
 
@@ -254,9 +254,9 @@ public class PTransformTranslation {
       FunctionSpec.Builder transformSpec =
           FunctionSpec.newBuilder().setUrn(getUrn(transform.getTransform()));
 
-      Any payload = transform.getTransform().getPayload();
+      ByteString payload = transform.getTransform().getPayload();
       if (payload != null) {
-        transformSpec.setParameter(payload);
+        transformSpec.setPayload(payload);
       }
 
       // Transforms like Combine may have Coders that need to be added but do not

http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
index 5765c51..6ae95e4 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
@@ -29,9 +29,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
-import com.google.protobuf.Any;
 import com.google.protobuf.ByteString;
-import com.google.protobuf.BytesValue;
 import com.google.protobuf.InvalidProtocolBufferException;
 import java.io.IOException;
 import java.io.Serializable;
@@ -122,7 +120,7 @@ public class ParDoTranslation {
       ParDoPayload payload = toProto(transform.getTransform(), components);
       return RunnerApi.FunctionSpec.newBuilder()
           .setUrn(PAR_DO_TRANSFORM_URN)
-          .setParameter(Any.pack(payload))
+          .setPayload(payload.toByteString())
           .build();
     }
 
@@ -240,7 +238,7 @@ public class ParDoTranslation {
     RunnerApi.PTransform protoTransform =
         PTransformTranslation.toProto(application, SdkComponents.create());
 
-    ParDoPayload payload = protoTransform.getSpec().getParameter().unpack(ParDoPayload.class);
+    ParDoPayload payload = ParDoPayload.parseFrom(protoTransform.getSpec().getPayload());
     TupleTag<?> mainOutputTag = getMainOutputTag(payload);
     Set<String> outputTags =
         Sets.difference(
@@ -259,7 +257,7 @@ public class ParDoTranslation {
     SdkComponents sdkComponents = SdkComponents.create();
     RunnerApi.PTransform parDoProto =
         PTransformTranslation.toProto(application, sdkComponents);
-    ParDoPayload payload = parDoProto.getSpec().getParameter().unpack(ParDoPayload.class);
+    ParDoPayload payload = ParDoPayload.parseFrom(parDoProto.getSpec().getPayload());
 
     List<PCollectionView<?>> views = new ArrayList<>();
     RehydratedComponents components =
@@ -289,7 +287,7 @@ public class ParDoTranslation {
         ptransform.getSpec().getUrn().equals(PAR_DO_TRANSFORM_URN),
         "Unexpected payload type %s",
         ptransform.getSpec().getUrn());
-    ParDoPayload payload = ptransform.getSpec().getParameter().unpack(ParDoPayload.class);
+    ParDoPayload payload = ParDoPayload.parseFrom(ptransform.getSpec().getPayload());
     String mainInputId =
         Iterables.getOnlyElement(
             Sets.difference(
@@ -377,7 +375,7 @@ public class ParDoTranslation {
         Combine.CombineFn<?, ?, ?> combineFn =
             (Combine.CombineFn<?, ?, ?>)
                 SerializableUtils.deserializeFromByteArray(
-                    combineFnSpec.getParameter().unpack(BytesValue.class).toByteArray(),
+                    combineFnSpec.getPayload().toByteArray(),
                     Combine.CombineFn.class.getSimpleName());
 
         // Rawtype coder cast because it is required to be a valid accumulator coder
@@ -443,14 +441,10 @@ public class ParDoTranslation {
         .setSpec(
             FunctionSpec.newBuilder()
                 .setUrn(CUSTOM_JAVA_DO_FN_URN)
-                .setParameter(
-                    Any.pack(
-                        BytesValue.newBuilder()
-                            .setValue(
-                                ByteString.copyFrom(
-                                    SerializableUtils.serializeToByteArray(
-                                        DoFnAndMainOutput.of(fn, tag))))
-                            .build())))
+                .setPayload(
+                    ByteString.copyFrom(
+                        SerializableUtils.serializeToByteArray(DoFnAndMainOutput.of(fn, tag))))
+                .build())
         .build();
   }
 
@@ -458,7 +452,7 @@ public class ParDoTranslation {
       throws InvalidProtocolBufferException {
     checkArgument(fnSpec.getSpec().getUrn().equals(CUSTOM_JAVA_DO_FN_URN));
     byte[] serializedFn =
-        fnSpec.getSpec().getParameter().unpack(BytesValue.class).getValue().toByteArray();
+        fnSpec.getSpec().getPayload().toByteArray();
     return (DoFnAndMainOutput)
         SerializableUtils.deserializeFromByteArray(serializedFn, "Custom DoFn And Main Output tag");
   }
@@ -542,22 +536,17 @@ public class ParDoTranslation {
         .setSpec(
             FunctionSpec.newBuilder()
                 .setUrn(CUSTOM_JAVA_VIEW_FN_URN)
-                .setParameter(
-                    Any.pack(
-                        BytesValue.newBuilder()
-                            .setValue(
-                                ByteString.copyFrom(SerializableUtils.serializeToByteArray(viewFn)))
-                            .build())))
+                .setPayload(ByteString.copyFrom(SerializableUtils.serializeToByteArray(viewFn)))
+                .build())
         .build();
   }
 
   private static <T> ParDoPayload getParDoPayload(AppliedPTransform<?, ?, ?> transform)
       throws IOException {
-    return PTransformTranslation.toProto(
-            transform, Collections.<AppliedPTransform<?, ?, ?>>emptyList(), SdkComponents.create())
-        .getSpec()
-        .getParameter()
-        .unpack(ParDoPayload.class);
+    RunnerApi.PTransform parDoPTransform =
+        PTransformTranslation.toProto(
+            transform, Collections.<AppliedPTransform<?, ?, ?>>emptyList(), SdkComponents.create());
+    return ParDoPayload.parseFrom(parDoPTransform.getSpec().getPayload());
   }
 
   public static boolean usesStateOrTimers(AppliedPTransform<?, ?, ?> transform) throws IOException {
@@ -580,7 +569,7 @@ public class ParDoTranslation {
         spec.getUrn());
     return (ViewFn<?, ?>)
         SerializableUtils.deserializeFromByteArray(
-            spec.getParameter().unpack(BytesValue.class).getValue().toByteArray(), "Custom ViewFn");
+            spec.getPayload().toByteArray(), "Custom ViewFn");
   }
 
   private static SdkFunctionSpec toProto(WindowMappingFn<?> windowMappingFn) {
@@ -588,13 +577,9 @@ public class ParDoTranslation {
         .setSpec(
             FunctionSpec.newBuilder()
                 .setUrn(CUSTOM_JAVA_WINDOW_MAPPING_FN_URN)
-                .setParameter(
-                    Any.pack(
-                        BytesValue.newBuilder()
-                            .setValue(
-                                ByteString.copyFrom(
-                                    SerializableUtils.serializeToByteArray(windowMappingFn)))
-                            .build())))
+                .setPayload(
+                    ByteString.copyFrom(SerializableUtils.serializeToByteArray(windowMappingFn)))
+                .build())
         .build();
   }
 
@@ -608,7 +593,6 @@ public class ParDoTranslation {
         spec.getUrn());
     return (WindowMappingFn<?>)
         SerializableUtils.deserializeFromByteArray(
-            spec.getParameter().unpack(BytesValue.class).getValue().toByteArray(),
-            "Custom WinodwMappingFn");
+            spec.getPayload().toByteArray(), "Custom WinodwMappingFn");
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
index 9e4839a..d928338 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
@@ -24,7 +24,7 @@ import com.google.auto.value.AutoValue;
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ListMultimap;
-import com.google.protobuf.Any;
+import com.google.protobuf.ByteString;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -158,7 +158,7 @@ public class PipelineTranslation {
     // TODO: ParDoTranslator should own it - https://issues.apache.org/jira/browse/BEAM-2674
     if (transformSpec.getUrn().equals(PTransformTranslation.PAR_DO_TRANSFORM_URN)) {
       RunnerApi.ParDoPayload payload =
-          transformSpec.getParameter().unpack(RunnerApi.ParDoPayload.class);
+          RunnerApi.ParDoPayload.parseFrom(transformSpec.getPayload());
 
       List<PCollectionView<?>> views = new ArrayList<>();
       for (Map.Entry<String, RunnerApi.SideInput> sideInputEntry :
@@ -182,7 +182,7 @@ public class PipelineTranslation {
     List<Coder<?>> additionalCoders = Collections.emptyList();
     if (transformSpec.getUrn().equals(PTransformTranslation.COMBINE_TRANSFORM_URN)) {
       RunnerApi.CombinePayload payload =
-          transformSpec.getParameter().unpack(RunnerApi.CombinePayload.class);
+          RunnerApi.CombinePayload.parseFrom(transformSpec.getPayload());
       additionalCoders =
           (List)
               Collections.singletonList(
@@ -192,7 +192,7 @@ public class PipelineTranslation {
     RehydratedPTransform transform =
         RehydratedPTransform.of(
             transformSpec.getUrn(),
-            transformSpec.getParameter(),
+            transformSpec.getPayload(),
             additionalInputs,
             additionalCoders);
 
@@ -233,7 +233,7 @@ public class PipelineTranslation {
     public abstract String getUrn();
 
     @Nullable
-    public abstract Any getPayload();
+    public abstract ByteString getPayload();
 
     @Override
     public abstract Map<TupleTag<?>, PValue> getAdditionalInputs();
@@ -242,7 +242,7 @@ public class PipelineTranslation {
 
     public static RehydratedPTransform of(
         String urn,
-        Any payload,
+        ByteString payload,
         Map<TupleTag<?>, PValue> additionalInputs,
         List<Coder<?>> additionalCoders) {
       return new AutoValue_PipelineTranslation_RehydratedPTransform(

http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java
index 572384b..06d1074 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java
@@ -22,9 +22,7 @@ import static com.google.common.base.Preconditions.checkArgument;
 
 import com.google.auto.service.AutoService;
 import com.google.common.collect.ImmutableMap;
-import com.google.protobuf.Any;
 import com.google.protobuf.ByteString;
-import com.google.protobuf.BytesValue;
 import com.google.protobuf.InvalidProtocolBufferException;
 import java.io.IOException;
 import java.util.Collections;
@@ -83,12 +81,8 @@ public class ReadTranslation {
         .setSpec(
             FunctionSpec.newBuilder()
                 .setUrn(JAVA_SERIALIZED_BOUNDED_SOURCE)
-                .setParameter(
-                    Any.pack(
-                        BytesValue.newBuilder()
-                            .setValue(
-                                ByteString.copyFrom(SerializableUtils.serializeToByteArray(source)))
-                            .build())))
+                .setPayload(ByteString.copyFrom(SerializableUtils.serializeToByteArray(source)))
+                .build())
         .build();
   }
 
@@ -99,9 +93,7 @@ public class ReadTranslation {
         payload
             .getSource()
             .getSpec()
-            .getParameter()
-            .unpack(BytesValue.class)
-            .getValue()
+            .getPayload()
             .toByteArray(),
         "BoundedSource");
   }
@@ -122,11 +114,13 @@ public class ReadTranslation {
   private static <T> ReadPayload getReadPayload(
       AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>> transform)
       throws IOException {
-    return PTransformTranslation.toProto(
-            transform, Collections.<AppliedPTransform<?, ?, ?>>emptyList(), SdkComponents.create())
-        .getSpec()
-        .getParameter()
-        .unpack(ReadPayload.class);
+    return ReadPayload.parseFrom(
+        PTransformTranslation.toProto(
+                transform,
+                Collections.<AppliedPTransform<?, ?, ?>>emptyList(),
+                SdkComponents.create())
+            .getSpec()
+            .getPayload());
   }
 
   private static SdkFunctionSpec toProto(UnboundedSource<?, ?> source) {
@@ -134,12 +128,8 @@ public class ReadTranslation {
         .setSpec(
             FunctionSpec.newBuilder()
                 .setUrn(JAVA_SERIALIZED_UNBOUNDED_SOURCE)
-                .setParameter(
-                    Any.pack(
-                        BytesValue.newBuilder()
-                            .setValue(
-                                ByteString.copyFrom(SerializableUtils.serializeToByteArray(source)))
-                            .build())))
+                .setPayload(ByteString.copyFrom(SerializableUtils.serializeToByteArray(source)))
+                .build())
         .build();
   }
 
@@ -150,9 +140,7 @@ public class ReadTranslation {
         payload
             .getSource()
             .getSpec()
-            .getParameter()
-            .unpack(BytesValue.class)
-            .getValue()
+            .getPayload()
             .toByteArray(),
         "BoundedSource");
   }
@@ -160,13 +148,13 @@ public class ReadTranslation {
   public static PCollection.IsBounded sourceIsBounded(AppliedPTransform<?, ?, ?> transform) {
     try {
       return PCollectionTranslation.fromProto(
-          PTransformTranslation.toProto(
-                  transform,
-                  Collections.<AppliedPTransform<?, ?, ?>>emptyList(),
-                  SdkComponents.create())
-              .getSpec()
-              .getParameter()
-              .unpack(ReadPayload.class)
+          ReadPayload.parseFrom(
+                  PTransformTranslation.toProto(
+                          transform,
+                          Collections.<AppliedPTransform<?, ?, ?>>emptyList(),
+                          SdkComponents.create())
+                      .getSpec()
+                      .getPayload())
               .getIsBounded());
     } catch (IOException e) {
       throw new RuntimeException("Internal error determining boundedness of Read", e);
@@ -195,7 +183,7 @@ public class ReadTranslation {
       ReadPayload payload = toProto(transform.getTransform());
       return RunnerApi.FunctionSpec.newBuilder()
           .setUrn(getUrn(transform.getTransform()))
-          .setParameter(Any.pack(payload))
+          .setPayload(payload.toByteString())
           .build();
     }
   }
@@ -222,7 +210,7 @@ public class ReadTranslation {
       ReadPayload payload = toProto(transform.getTransform());
       return RunnerApi.FunctionSpec.newBuilder()
           .setUrn(getUrn(transform.getTransform()))
-          .setParameter(Any.pack(payload))
+          .setPayload(payload.toByteString())
           .build();
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java
index f23b2ec..cac7cdc 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java
@@ -22,7 +22,6 @@ import static com.google.common.base.Preconditions.checkArgument;
 import static org.apache.beam.runners.core.construction.PTransformTranslation.TEST_STREAM_TRANSFORM_URN;
 
 import com.google.auto.service.AutoService;
-import com.google.protobuf.Any;
 import com.google.protobuf.ByteString;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -96,7 +95,7 @@ public class TestStreamTranslation {
         TestStream.class.getSimpleName(),
         transformProto.getSpec().getUrn());
     RunnerApi.TestStreamPayload testStreamPayload =
-        transformProto.getSpec().getParameter().unpack(RunnerApi.TestStreamPayload.class);
+        RunnerApi.TestStreamPayload.parseFrom(transformProto.getSpec().getPayload());
 
     return (TestStream<T>)
         fromProto(
@@ -185,7 +184,7 @@ public class TestStreamTranslation {
         throws IOException {
       return RunnerApi.FunctionSpec.newBuilder()
           .setUrn(getUrn(transform.getTransform()))
-          .setParameter(Any.pack(testStreamToPayload(transform.getTransform(), components)))
+          .setPayload(testStreamToPayload(transform.getTransform(), components).toByteString())
           .build();
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java
index 6aec908..94ef22d 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java
@@ -21,7 +21,6 @@ package org.apache.beam.runners.core.construction;
 import static com.google.common.base.Preconditions.checkArgument;
 
 import com.google.auto.service.AutoService;
-import com.google.protobuf.Any;
 import com.google.protobuf.InvalidProtocolBufferException;
 import java.io.IOException;
 import java.util.Collections;
@@ -54,8 +53,8 @@ public class WindowIntoTranslation {
         AppliedPTransform<?, ?, Window.Assign<?>> transform, SdkComponents components) {
       return FunctionSpec.newBuilder()
           .setUrn("urn:beam:transform:window:v1")
-          .setParameter(
-              Any.pack(WindowIntoTranslation.toProto(transform.getTransform(), components)))
+          .setPayload(
+              WindowIntoTranslation.toProto(transform.getTransform(), components).toByteString())
           .build();
     }
   }
@@ -88,7 +87,7 @@ public class WindowIntoTranslation {
 
     WindowIntoPayload windowIntoPayload;
     try {
-      return transformProto.getSpec().getParameter().unpack(WindowIntoPayload.class);
+      return WindowIntoPayload.parseFrom(transformProto.getSpec().getPayload());
     } catch (InvalidProtocolBufferException exc) {
       throw new IllegalStateException(
           String.format(
@@ -128,7 +127,7 @@ public class WindowIntoTranslation {
       WindowIntoPayload payload = toProto(transform.getTransform(), components);
       return RunnerApi.FunctionSpec.newBuilder()
           .setUrn(getUrn(transform.getTransform()))
-          .setParameter(Any.pack(payload))
+          .setPayload(payload.toByteString())
           .build();
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
index 565b552..ab50ea2 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
@@ -31,6 +31,9 @@ import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi.OutputTime;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec;
 import org.apache.beam.sdk.common.runner.v1.StandardWindowFns;
+import org.apache.beam.sdk.common.runner.v1.StandardWindowFns.FixedWindowsPayload;
+import org.apache.beam.sdk.common.runner.v1.StandardWindowFns.SessionsPayload;
+import org.apache.beam.sdk.common.runner.v1.StandardWindowFns.SlidingWindowsPayload;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.beam.sdk.transforms.windowing.Sessions;
@@ -199,77 +202,65 @@ public class WindowingStrategyTranslation implements Serializable {
   public static SdkFunctionSpec toProto(
       WindowFn<?, ?> windowFn, @SuppressWarnings("unused") SdkComponents components) {
     // TODO: Set environment IDs
+    ByteString serializedFn = ByteString.copyFrom(SerializableUtils.serializeToByteArray(windowFn));
     if (USE_OLD_SERIALIZED_JAVA_WINDOWFN_URN) {
       return SdkFunctionSpec.newBuilder()
           .setSpec(
               FunctionSpec.newBuilder()
                   .setUrn(OLD_SERIALIZED_JAVA_WINDOWFN_URN)
-                  .setParameter(
-                      Any.pack(
-                          BytesValue.newBuilder()
-                              .setValue(
-                                  ByteString.copyFrom(
-                                      SerializableUtils.serializeToByteArray(windowFn)))
-                              .build())))
+                  .setAnyParam(Any.pack(BytesValue.newBuilder().setValue(serializedFn).build()))
+                  .setPayload(serializedFn)
+                  .build())
           .build();
     } else if (windowFn instanceof GlobalWindows) {
       return SdkFunctionSpec.newBuilder()
           .setSpec(FunctionSpec.newBuilder().setUrn(GLOBAL_WINDOWS_FN))
           .build();
     } else if (windowFn instanceof FixedWindows) {
+      FixedWindowsPayload fixedWindowsPayload =
+          FixedWindowsPayload.newBuilder()
+              .setSize(Durations.fromMillis(((FixedWindows) windowFn).getSize().getMillis()))
+              .setOffset(Timestamps.fromMillis(((FixedWindows) windowFn).getOffset().getMillis()))
+              .build();
       return SdkFunctionSpec.newBuilder()
           .setSpec(
               FunctionSpec.newBuilder()
                   .setUrn(FIXED_WINDOWS_FN)
-                  .setParameter(
-                      Any.pack(
-                          StandardWindowFns.FixedWindowsPayload.newBuilder()
-                              .setSize(Durations.fromMillis(
-                                  ((FixedWindows) windowFn).getSize().getMillis()))
-                              .setOffset(Timestamps.fromMillis(
-                                  ((FixedWindows) windowFn).getOffset().getMillis()))
-                              .build())))
+                  .setAnyParam(Any.pack(fixedWindowsPayload))
+                  .setPayload(fixedWindowsPayload.toByteString()))
           .build();
     } else if (windowFn instanceof SlidingWindows) {
+      SlidingWindowsPayload slidingWindowsPayload = SlidingWindowsPayload.newBuilder()
+          .setSize(Durations.fromMillis(((SlidingWindows) windowFn).getSize().getMillis()))
+          .setOffset(Timestamps.fromMillis(((SlidingWindows) windowFn).getOffset().getMillis()))
+          .setPeriod(Durations.fromMillis(((SlidingWindows) windowFn).getPeriod().getMillis()))
+          .build();
       return SdkFunctionSpec.newBuilder()
           .setSpec(
               FunctionSpec.newBuilder()
                   .setUrn(SLIDING_WINDOWS_FN)
-                  .setParameter(
-                      Any.pack(
-                          StandardWindowFns.SlidingWindowsPayload.newBuilder()
-                              .setSize(Durations.fromMillis(
-                                  ((SlidingWindows) windowFn).getSize().getMillis()))
-                              .setOffset(Timestamps.fromMillis(
-                                  ((SlidingWindows) windowFn).getOffset().getMillis()))
-                              .setPeriod(Durations.fromMillis(
-                                  ((SlidingWindows) windowFn).getPeriod().getMillis()))
-                              .build())))
+                  .setAnyParam(Any.pack(slidingWindowsPayload))
+                  .setPayload(slidingWindowsPayload.toByteString()))
           .build();
     } else if (windowFn instanceof Sessions) {
+      SessionsPayload sessionsPayload =
+          SessionsPayload.newBuilder()
+              .setGapSize(Durations.fromMillis(((Sessions) windowFn).getGapDuration().getMillis()))
+              .build();
       return SdkFunctionSpec.newBuilder()
           .setSpec(
               FunctionSpec.newBuilder()
                   .setUrn(SESSION_WINDOWS_FN)
-                  .setParameter(
-                      Any.pack(
-                          StandardWindowFns.SessionsPayload.newBuilder()
-                              .setGapSize(Durations.fromMillis(
-                                  ((Sessions) windowFn).getGapDuration().getMillis()))
-                              .build())))
+                  .setAnyParam(Any.pack(sessionsPayload))
+                  .setPayload(sessionsPayload.toByteString()))
           .build();
     } else {
       return SdkFunctionSpec.newBuilder()
           .setSpec(
               FunctionSpec.newBuilder()
                   .setUrn(SERIALIZED_JAVA_WINDOWFN_URN)
-                  .setParameter(
-                      Any.pack(
-                          BytesValue.newBuilder()
-                              .setValue(
-                                  ByteString.copyFrom(
-                                      SerializableUtils.serializeToByteArray(windowFn)))
-                              .build())))
+                  .setAnyParam(Any.pack(BytesValue.newBuilder().setValue(serializedFn).build()))
+                  .setPayload(serializedFn))
           .build();
     }
   }
@@ -365,49 +356,41 @@ public class WindowingStrategyTranslation implements Serializable {
         case GLOBAL_WINDOWS_FN:
           return new GlobalWindows();
         case FIXED_WINDOWS_FN:
-          StandardWindowFns.FixedWindowsPayload fixedParams =
-              windowFnSpec
-                  .getSpec()
-                  .getParameter()
-                  .unpack(StandardWindowFns.FixedWindowsPayload.class);
+          StandardWindowFns.FixedWindowsPayload fixedParams = null;
+          fixedParams =
+              StandardWindowFns.FixedWindowsPayload.parseFrom(
+                  windowFnSpec.getSpec().getPayload());
           return FixedWindows.of(Duration.millis(Durations.toMillis(fixedParams.getSize())))
               .withOffset(Duration.millis(Timestamps.toMillis(fixedParams.getOffset())));
         case SLIDING_WINDOWS_FN:
           StandardWindowFns.SlidingWindowsPayload slidingParams =
-              windowFnSpec
-                  .getSpec()
-                  .getParameter()
-                  .unpack(StandardWindowFns.SlidingWindowsPayload.class);
+              StandardWindowFns.SlidingWindowsPayload.parseFrom(
+                  windowFnSpec.getSpec().getPayload());
           return SlidingWindows.of(Duration.millis(Durations.toMillis(slidingParams.getSize())))
               .every(Duration.millis(Durations.toMillis(slidingParams.getPeriod())))
               .withOffset(Duration.millis(Timestamps.toMillis(slidingParams.getOffset())));
         case SESSION_WINDOWS_FN:
           StandardWindowFns.SessionsPayload sessionParams =
-              windowFnSpec.getSpec().getParameter().unpack(StandardWindowFns.SessionsPayload.class);
+              StandardWindowFns.SessionsPayload.parseFrom(windowFnSpec.getSpec().getPayload());
           return Sessions.withGapDuration(
               Duration.millis(Durations.toMillis(sessionParams.getGapSize())));
         case SERIALIZED_JAVA_WINDOWFN_URN:
         case OLD_SERIALIZED_JAVA_WINDOWFN_URN:
           return (WindowFn<?, ?>)
               SerializableUtils.deserializeFromByteArray(
-                  windowFnSpec
-                      .getSpec()
-                      .getParameter()
-                      .unpack(BytesValue.class)
-                      .getValue()
-                      .toByteArray(),
-                  "WindowFn");
+                  windowFnSpec.getSpec().getPayload().toByteArray(), "WindowFn");
         default:
           throw new IllegalArgumentException(
               "Unknown or unsupported WindowFn: " + windowFnSpec.getSpec().getUrn());
       }
-    } catch (InvalidProtocolBufferException exc) {
+    } catch (InvalidProtocolBufferException e) {
       throw new IllegalArgumentException(
           String.format(
               "%s for %s with URN %s did not contain expected proto message for payload",
               FunctionSpec.class.getSimpleName(),
               WindowFn.class.getSimpleName(),
-              windowFnSpec.getSpec().getUrn()));
+              windowFnSpec.getSpec().getUrn()),
+          e);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
index 7954b0e..aeefd4f 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
@@ -25,9 +25,7 @@ import com.google.auto.service.AutoService;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import com.google.protobuf.Any;
 import com.google.protobuf.ByteString;
-import com.google.protobuf.BytesValue;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Collections;
@@ -83,13 +81,9 @@ public class WriteFilesTranslation {
         .setSpec(
             FunctionSpec.newBuilder()
                 .setUrn(urn)
-                .setParameter(
-                    Any.pack(
-                        BytesValue.newBuilder()
-                            .setValue(
-                                ByteString.copyFrom(
-                                    SerializableUtils.serializeToByteArray(serializable)))
-                            .build())))
+                .setPayload(
+                    ByteString.copyFrom(SerializableUtils.serializeToByteArray(serializable)))
+                .build())
         .build();
   }
 
@@ -102,8 +96,7 @@ public class WriteFilesTranslation {
         FunctionSpec.class.getSimpleName(),
         sinkProto.getSpec().getUrn());
 
-    byte[] serializedSink =
-        sinkProto.getSpec().getParameter().unpack(BytesValue.class).getValue().toByteArray();
+    byte[] serializedSink = sinkProto.getSpec().getPayload().toByteArray();
 
     return (FileBasedSink<?, ?, ?>)
         SerializableUtils.deserializeFromByteArray(
@@ -163,11 +156,13 @@ public class WriteFilesTranslation {
       AppliedPTransform<PCollection<T>, PDone, ? extends PTransform<PCollection<T>, PDone>>
           transform)
       throws IOException {
-    return PTransformTranslation.toProto(
-            transform, Collections.<AppliedPTransform<?, ?, ?>>emptyList(), SdkComponents.create())
-        .getSpec()
-        .getParameter()
-        .unpack(WriteFilesPayload.class);
+    return WriteFilesPayload.parseFrom(
+        PTransformTranslation.toProto(
+                transform,
+                Collections.<AppliedPTransform<?, ?, ?>>emptyList(),
+                SdkComponents.create())
+            .getSpec()
+            .getPayload());
   }
 
   static class WriteFilesTranslator implements TransformPayloadTranslator<WriteFiles<?, ?, ?>> {
@@ -181,7 +176,7 @@ public class WriteFilesTranslation {
         AppliedPTransform<?, ?, WriteFiles<?, ?, ?>> transform, SdkComponents components) {
       return FunctionSpec.newBuilder()
           .setUrn(getUrn(transform.getTransform()))
-          .setParameter(Any.pack(toProto(transform.getTransform())))
+          .setPayload(toProto(transform.getTransform()).toByteString())
           .build();
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslationTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslationTest.java
index 0d209a0..4f57af8 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslationTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslationTest.java
@@ -21,7 +21,6 @@ package org.apache.beam.runners.core.construction;
 import static org.junit.Assert.assertThat;
 
 import com.google.common.collect.ImmutableList;
-import com.google.protobuf.BytesValue;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi.ParDoPayload;
 import org.apache.beam.sdk.runners.AppliedPTransform;
@@ -98,8 +97,7 @@ public class CreatePCollectionViewTranslationTest {
       PCollectionView<?> deserializedView =
           (PCollectionView<?>)
               SerializableUtils.deserializeFromByteArray(
-                  payload.getParameter().unpack(BytesValue.class).getValue().toByteArray(),
-                  PCollectionView.class.getSimpleName());
+                  payload.getPayload().toByteArray(), PCollectionView.class.getSimpleName());
 
       assertThat(
           deserializedView, Matchers.<PCollectionView<?>>equalTo(createViewTransform.getView()));
@@ -126,7 +124,7 @@ public class CreatePCollectionViewTranslationTest {
       PCollectionView<?> deserializedView =
           (PCollectionView<?>)
               SerializableUtils.deserializeFromByteArray(
-                  payload.getParameter().unpack(BytesValue.class).getValue().toByteArray(),
+                  payload.getPayload().toByteArray(),
                   PCollectionView.class.getSimpleName());
 
       assertThat(

http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java
index c31e803..680f940 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java
@@ -156,8 +156,7 @@ public class ParDoTranslationTest {
       // Decode
       Pipeline rehydratedPipeline = Pipeline.create();
 
-      ParDoPayload parDoPayload =
-          protoTransform.getSpec().getParameter().unpack(ParDoPayload.class);
+      ParDoPayload parDoPayload = ParDoPayload.parseFrom(protoTransform.getSpec().getPayload());
       for (PCollectionView<?> view : parDo.getSideInputs()) {
         SideInput sideInput = parDoPayload.getSideInputsOrThrow(view.getTagInternal().getId());
         PCollectionView<?> restoredView =

http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TestStreamTranslationTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TestStreamTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TestStreamTranslationTest.java
index e4336df..893f4b9 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TestStreamTranslationTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TestStreamTranslationTest.java
@@ -29,6 +29,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi.ParDoPayload;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.TestStreamPayload;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.TestStream;
@@ -100,8 +101,7 @@ public class TestStreamTranslationTest {
 
       assertThat(spec.getUrn(), equalTo(TEST_STREAM_TRANSFORM_URN));
 
-      RunnerApi.TestStreamPayload payload =
-          spec.getParameter().unpack(RunnerApi.TestStreamPayload.class);
+      RunnerApi.TestStreamPayload payload = TestStreamPayload.parseFrom(spec.getPayload());
 
       verifyTestStreamEncoding(
           testStream, payload, RehydratedComponents.forComponents(components.toComponents()));

http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
----------------------------------------------------------------------
diff --git a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
index 9afb565..fb5d47e 100644
--- a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
+++ b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
@@ -755,9 +755,12 @@ message FunctionSpec {
   // passed as-is.
   string urn = 1;
 
+  // (Deprecated)
+  google.protobuf.Any any_param = 2;
+
   // (Optional) The data specifying any parameters to the URN. If
   // the URN does not require any arguments, this may be omitted.
-  google.protobuf.Any parameter = 2;
+  bytes payload = 3;
 }
 
 // TODO: transfer javadoc here

http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java
index 1e611db..df0e5a2 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java
@@ -129,8 +129,8 @@ public class BeamFnDataReadRunner<OutputT> {
       BeamFnDataClient beamFnDataClientFactory,
       Collection<ThrowingConsumer<WindowedValue<OutputT>>> consumers)
           throws IOException {
-    this.apiServiceDescriptor = functionSpec.getParameter().unpack(BeamFnApi.RemoteGrpcPort.class)
-        .getApiServiceDescriptor();
+    this.apiServiceDescriptor =
+        BeamFnApi.RemoteGrpcPort.parseFrom(functionSpec.getPayload()).getApiServiceDescriptor();
     this.inputTarget = inputTarget;
     this.processBundleInstructionIdSupplier = processBundleInstructionIdSupplier;
     this.beamFnDataClientFactory = beamFnDataClientFactory;

http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java
index bbed753..48b450a 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java
@@ -120,8 +120,8 @@ public class BeamFnDataWriteRunner<InputT> {
       Map<String, RunnerApi.Coder> coders,
       BeamFnDataClient beamFnDataClientFactory)
           throws IOException {
-    this.apiServiceDescriptor = functionSpec.getParameter().unpack(BeamFnApi.RemoteGrpcPort.class)
-        .getApiServiceDescriptor();
+    this.apiServiceDescriptor =
+        BeamFnApi.RemoteGrpcPort.parseFrom(functionSpec.getPayload()).getApiServiceDescriptor();
     this.beamFnDataClientFactory = beamFnDataClientFactory;
     this.processBundleInstructionIdSupplier = processBundleInstructionIdSupplier;
     this.outputTarget = outputTarget;

http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BoundedSourceRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BoundedSourceRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BoundedSourceRunner.java
index 4702e05..5f6509f 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BoundedSourceRunner.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BoundedSourceRunner.java
@@ -22,7 +22,6 @@ import com.google.auto.service.AutoService;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Multimap;
-import com.google.protobuf.BytesValue;
 import com.google.protobuf.InvalidProtocolBufferException;
 import java.io.IOException;
 import java.util.Collection;
@@ -122,17 +121,14 @@ public class BoundedSourceRunner<InputT extends BoundedSource<OutputT>, OutputT>
   public void start() throws Exception {
     try {
       // The representation here is defined as the java serialized representation of the
-      // bounded source object packed into a protobuf Any using a protobuf BytesValue wrapper.
-      byte[] bytes = definition.getParameter().unpack(BytesValue.class).getValue().toByteArray();
+      // bounded source object in a ByteString wrapper.
+      byte[] bytes = definition.getPayload().toByteArray();
       @SuppressWarnings("unchecked")
       InputT boundedSource =
           (InputT) SerializableUtils.deserializeFromByteArray(bytes, definition.toString());
       runReadLoop(WindowedValue.valueInGlobalWindow(boundedSource));
     } catch (InvalidProtocolBufferException e) {
-      throw new IOException(
-          String.format("Failed to decode %s, expected %s",
-              definition.getParameter().getTypeUrl(), BytesValue.getDescriptor().getFullName()),
-          e);
+      throw new IOException(String.format("Failed to decode %s", definition.getUrn()), e);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
index 97bd71c..86168f9 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
@@ -25,8 +25,6 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableMultimap;
 import com.google.common.collect.Multimap;
 import com.google.protobuf.ByteString;
-import com.google.protobuf.BytesValue;
-import com.google.protobuf.InvalidProtocolBufferException;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -109,13 +107,7 @@ public class FnApiDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Outp
           outputMapBuilder.build();
 
       // Get the DoFnInfo from the serialized blob.
-      ByteString serializedFn;
-      try {
-        serializedFn = pTransform.getSpec().getParameter().unpack(BytesValue.class).getValue();
-      } catch (InvalidProtocolBufferException e) {
-        throw new IllegalArgumentException(
-            String.format("Unable to unwrap DoFn %s", pTransform.getSpec()), e);
-      }
+      ByteString serializedFn = pTransform.getSpec().getPayload();
       @SuppressWarnings({"unchecked", "rawtypes"})
       DoFnInfo<InputT, OutputT> doFnInfo = (DoFnInfo) SerializableUtils.deserializeFromByteArray(
           serializedFn.toByteArray(), "DoFnInfo");

http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java
index d712f5f..92e6088 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java
@@ -37,7 +37,6 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Multimap;
 import com.google.common.util.concurrent.Uninterruptibles;
-import com.google.protobuf.Any;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -80,7 +79,7 @@ public class BeamFnDataReadRunnerTest {
   private static final BeamFnApi.RemoteGrpcPort PORT_SPEC = BeamFnApi.RemoteGrpcPort.newBuilder()
       .setApiServiceDescriptor(BeamFnApi.ApiServiceDescriptor.getDefaultInstance()).build();
   private static final RunnerApi.FunctionSpec FUNCTION_SPEC = RunnerApi.FunctionSpec.newBuilder()
-      .setParameter(Any.pack(PORT_SPEC)).build();
+      .setPayload(PORT_SPEC.toByteString()).build();
   private static final Coder<WindowedValue<String>> CODER =
       WindowedValue.getFullCoder(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE);
   private static final String CODER_SPEC_ID = "string-coder-id";
@@ -131,7 +130,7 @@ public class BeamFnDataReadRunnerTest {
 
     RunnerApi.FunctionSpec functionSpec = RunnerApi.FunctionSpec.newBuilder()
         .setUrn("urn:org.apache.beam:source:runner:0.1")
-        .setParameter(Any.pack(PORT_SPEC))
+        .setPayload(PORT_SPEC.toByteString())
         .build();
 
     RunnerApi.PTransform pTransform = RunnerApi.PTransform.newBuilder()

http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java
index 0caf19e..ffa3a2d 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java
@@ -37,7 +37,6 @@ import com.google.common.collect.HashMultimap;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Multimap;
-import com.google.protobuf.Any;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -75,7 +74,7 @@ public class BeamFnDataWriteRunnerTest {
   private static final BeamFnApi.RemoteGrpcPort PORT_SPEC = BeamFnApi.RemoteGrpcPort.newBuilder()
       .setApiServiceDescriptor(BeamFnApi.ApiServiceDescriptor.getDefaultInstance()).build();
   private static final RunnerApi.FunctionSpec FUNCTION_SPEC = RunnerApi.FunctionSpec.newBuilder()
-      .setParameter(Any.pack(PORT_SPEC)).build();
+      .setPayload(PORT_SPEC.toByteString()).build();
   private static final String CODER_ID = "string-coder-id";
   private static final Coder<WindowedValue<String>> CODER =
       WindowedValue.getFullCoder(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE);
@@ -117,7 +116,7 @@ public class BeamFnDataWriteRunnerTest {
 
     RunnerApi.FunctionSpec functionSpec = RunnerApi.FunctionSpec.newBuilder()
         .setUrn("urn:org.apache.beam:sink:runner:0.1")
-        .setParameter(Any.pack(PORT_SPEC))
+        .setPayload(PORT_SPEC.toByteString())
         .build();
 
     RunnerApi.PTransform pTransform = RunnerApi.PTransform.newBuilder()

http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BoundedSourceRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BoundedSourceRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BoundedSourceRunnerTest.java
index 7aec161..b9f22e8 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BoundedSourceRunnerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BoundedSourceRunnerTest.java
@@ -31,9 +31,7 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Multimap;
-import com.google.protobuf.Any;
 import com.google.protobuf.ByteString;
-import com.google.protobuf.BytesValue;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
@@ -107,8 +105,7 @@ public class BoundedSourceRunnerTest {
 
     BoundedSourceRunner<BoundedSource<Long>, Long> runner = new BoundedSourceRunner<>(
         PipelineOptionsFactory.create(),
-        RunnerApi.FunctionSpec.newBuilder().setParameter(
-            Any.pack(BytesValue.newBuilder().setValue(encodedSource).build())).build(),
+        RunnerApi.FunctionSpec.newBuilder().setPayload(encodedSource).build(),
         consumers);
 
     runner.start();
@@ -127,13 +124,12 @@ public class BoundedSourceRunnerTest {
     List<ThrowingRunnable> startFunctions = new ArrayList<>();
     List<ThrowingRunnable> finishFunctions = new ArrayList<>();
 
-    RunnerApi.FunctionSpec functionSpec = RunnerApi.FunctionSpec.newBuilder()
-        .setUrn("urn:org.apache.beam:source:java:0.1")
-        .setParameter(Any.pack(BytesValue.newBuilder()
-            .setValue(ByteString.copyFrom(
-                SerializableUtils.serializeToByteArray(CountingSource.upTo(3))))
-            .build()))
-        .build();
+    RunnerApi.FunctionSpec functionSpec =
+        RunnerApi.FunctionSpec.newBuilder()
+            .setUrn("urn:org.apache.beam:source:java:0.1")
+            .setPayload(
+                ByteString.copyFrom(SerializableUtils.serializeToByteArray(CountingSource.upTo(3))))
+            .build();
 
     RunnerApi.PTransform pTransform = RunnerApi.PTransform.newBuilder()
         .setSpec(functionSpec)

http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
index e269bcc..efa8fcf 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
@@ -31,9 +31,7 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Multimap;
-import com.google.protobuf.Any;
 import com.google.protobuf.ByteString;
-import com.google.protobuf.BytesValue;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.ServiceLoader;
@@ -102,12 +100,11 @@ public class FnApiDoFnRunnerTest {
         ImmutableMap.of(
             Long.parseLong(mainOutputId), TestDoFn.mainOutput,
             Long.parseLong(additionalOutputId), TestDoFn.additionalOutput));
-    RunnerApi.FunctionSpec functionSpec = RunnerApi.FunctionSpec.newBuilder()
-        .setUrn(ParDoTranslation.CUSTOM_JAVA_DO_FN_URN)
-        .setParameter(Any.pack(BytesValue.newBuilder()
-            .setValue(ByteString.copyFrom(SerializableUtils.serializeToByteArray(doFnInfo)))
-            .build()))
-        .build();
+    RunnerApi.FunctionSpec functionSpec =
+        RunnerApi.FunctionSpec.newBuilder()
+            .setUrn(ParDoTranslation.CUSTOM_JAVA_DO_FN_URN)
+            .setPayload(ByteString.copyFrom(SerializableUtils.serializeToByteArray(doFnInfo)))
+            .build();
     RunnerApi.PTransform pTransform = RunnerApi.PTransform.newBuilder()
         .setSpec(functionSpec)
         .putInputs("inputA", "inputATarget")

http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/sdks/python/apache_beam/coders/coders.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py
index c56ef52..7ced5a9 100644
--- a/sdks/python/apache_beam/coders/coders.py
+++ b/sdks/python/apache_beam/coders/coders.py
@@ -206,22 +206,21 @@ class Coder(object):
     """For internal use only; no backwards-compatibility guarantees.
     """
     # TODO(BEAM-115): Use specialized URNs and components.
+    serialized_coder = serialize_coder(self)
     return beam_runner_api_pb2.Coder(
         spec=beam_runner_api_pb2.SdkFunctionSpec(
             spec=beam_runner_api_pb2.FunctionSpec(
                 urn=urns.PICKLED_CODER,
-                parameter=proto_utils.pack_Any(
+                any_param=proto_utils.pack_Any(
                     google.protobuf.wrappers_pb2.BytesValue(
-                        value=serialize_coder(self))))))
+                        value=serialized_coder)),
+                payload=serialized_coder)))
 
   @staticmethod
   def from_runner_api(proto, context):
     """For internal use only; no backwards-compatibility guarantees.
     """
-    any_proto = proto.spec.spec.parameter
-    bytes_proto = google.protobuf.wrappers_pb2.BytesValue()
-    any_proto.Unpack(bytes_proto)
-    return deserialize_coder(bytes_proto.value)
+    return deserialize_coder(proto.spec.spec.payload)
 
 
 class StrUtf8Coder(Coder):

http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/sdks/python/apache_beam/runners/portability/fn_api_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
index 3222bcb..7c0c06f 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
@@ -218,16 +218,16 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
       def side_inputs(self):
         for transform in self.transforms:
           if transform.spec.urn == urns.PARDO_TRANSFORM:
-            payload = proto_utils.unpack_Any(
-                transform.spec.parameter, beam_runner_api_pb2.ParDoPayload)
+            payload = proto_utils.parse_Bytes(
+                transform.spec.payload, beam_runner_api_pb2.ParDoPayload)
             for side_input in payload.side_inputs:
               yield transform.inputs[side_input]
 
       def has_as_main_input(self, pcoll):
         for transform in self.transforms:
           if transform.spec.urn == urns.PARDO_TRANSFORM:
-            payload = proto_utils.unpack_Any(
-                transform.spec.parameter, beam_runner_api_pb2.ParDoPayload)
+            payload = proto_utils.parse_Bytes(
+                transform.spec.payload, beam_runner_api_pb2.ParDoPayload)
             local_side_inputs = payload.side_inputs
           else:
             local_side_inputs = {}
@@ -257,9 +257,7 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
         transform = stage.transforms[0]
         if transform.spec.urn == urns.GROUP_BY_KEY_ONLY_TRANSFORM:
           # This is used later to correlate the read and write.
-          param = proto_utils.pack_Any(
-              wrappers_pb2.BytesValue(
-                  value=str("group:%s" % stage.name)))
+          param = str("group:%s" % stage.name)
           gbk_write = Stage(
               transform.unique_name + '/Write',
               [beam_runner_api_pb2.PTransform(
@@ -267,7 +265,9 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
                   inputs=transform.inputs,
                   spec=beam_runner_api_pb2.FunctionSpec(
                       urn=bundle_processor.DATA_OUTPUT_URN,
-                      parameter=param))],
+                      any_param=proto_utils.pack_Any(
+                          wrappers_pb2.BytesValue(value=param)),
+                      payload=param))],
               downstream_side_inputs=frozenset(),
               must_follow=stage.must_follow)
           yield gbk_write
@@ -279,7 +279,9 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
                   outputs=transform.outputs,
                   spec=beam_runner_api_pb2.FunctionSpec(
                       urn=bundle_processor.DATA_INPUT_URN,
-                      parameter=param))],
+                      any_param=proto_utils.pack_Any(
+                          wrappers_pb2.BytesValue(value=param)),
+                      payload=param))],
               downstream_side_inputs=frozenset(),
               must_follow=union(frozenset([gbk_write]), stage.must_follow))
         else:
@@ -299,9 +301,7 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
         transform = stage.transforms[0]
         if transform.spec.urn == urns.FLATTEN_TRANSFORM:
           # This is used later to correlate the read and writes.
-          param = proto_utils.pack_Any(
-              wrappers_pb2.BytesValue(
-                  value=str("materialize:%s" % transform.unique_name)))
+          param = str("materialize:%s" % transform.unique_name)
           output_pcoll_id, = transform.outputs.values()
           output_coder_id = pcollections[output_pcoll_id].coder_id
           flatten_writes = []
@@ -337,7 +337,10 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
                     inputs={local_in: transcoded_pcollection},
                     spec=beam_runner_api_pb2.FunctionSpec(
                         urn=bundle_processor.DATA_OUTPUT_URN,
-                        parameter=param))],
+                        any_param=proto_utils.pack_Any(
+                            wrappers_pb2.BytesValue(
+                                value=param)),
+                        payload=param))],
                 downstream_side_inputs=frozenset(),
                 must_follow=stage.must_follow)
             flatten_writes.append(flatten_write)
@@ -350,7 +353,10 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
                   outputs=transform.outputs,
                   spec=beam_runner_api_pb2.FunctionSpec(
                       urn=bundle_processor.DATA_INPUT_URN,
-                      parameter=param))],
+                      any_param=proto_utils.pack_Any(
+                          wrappers_pb2.BytesValue(
+                              value=param)),
+                      payload=param))],
               downstream_side_inputs=frozenset(),
               must_follow=union(frozenset(flatten_writes), stage.must_follow))
 
@@ -439,9 +445,7 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
 
       # Now try to fuse away all pcollections.
       for pcoll, producer in producers_by_pcoll.items():
-        pcoll_as_param = proto_utils.pack_Any(
-            wrappers_pb2.BytesValue(
-                value=str("materialize:%s" % pcoll)))
+        pcoll_as_param = str("materialize:%s" % pcoll)
         write_pcoll = None
         for consumer in consumers_by_pcoll[pcoll]:
           producer = replacement(producer)
@@ -461,7 +465,10 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
                       inputs={'in': pcoll},
                       spec=beam_runner_api_pb2.FunctionSpec(
                           urn=bundle_processor.DATA_OUTPUT_URN,
-                          parameter=pcoll_as_param))])
+                          any_param=proto_utils.pack_Any(
+                              wrappers_pb2.BytesValue(
+                                  value=pcoll_as_param)),
+                          payload=pcoll_as_param))])
               fuse(producer, write_pcoll)
             if consumer.has_as_main_input(pcoll):
               read_pcoll = Stage(
@@ -471,7 +478,10 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
                       outputs={'out': pcoll},
                       spec=beam_runner_api_pb2.FunctionSpec(
                           urn=bundle_processor.DATA_INPUT_URN,
-                          parameter=pcoll_as_param))],
+                          any_param=proto_utils.pack_Any(
+                              wrappers_pb2.BytesValue(
+                                  value=pcoll_as_param)),
+                          payload=pcoll_as_param))],
                   must_follow={write_pcoll})
               fuse(read_pcoll, consumer)
 
@@ -567,8 +577,7 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
       data_side_input = {}
       data_output = {}
       for transform in stage.transforms:
-        pcoll_id = proto_utils.unpack_Any(
-            transform.spec.parameter, wrappers_pb2.BytesValue).value
+        pcoll_id = transform.spec.payload
         if transform.spec.urn in (bundle_processor.DATA_INPUT_URN,
                                   bundle_processor.DATA_OUTPUT_URN):
           if transform.spec.urn == bundle_processor.DATA_INPUT_URN:
@@ -580,9 +589,11 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
           else:
             raise NotImplementedError
           if data_operation_spec:
-            transform.spec.parameter.CopyFrom(data_operation_spec)
+            transform.spec.payload = data_operation_spec
+            transform.spec.any_param.CopyFrom(data_operation_spec)
           else:
-            transform.spec.parameter.Clear()
+            transform.spec.payload = ""
+            transform.spec.any_param.Clear()
       return data_input, data_side_input, data_output
 
     logging.info('Running %s', stage.name)
@@ -728,7 +739,9 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
         runner_sinks[(transform_id, target_name)] = operation
         transform_spec = beam_runner_api_pb2.FunctionSpec(
             urn=bundle_processor.DATA_OUTPUT_URN,
-            parameter=proto_utils.pack_Any(data_operation_spec))
+            any_param=proto_utils.pack_Any(data_operation_spec),
+            payload=data_operation_spec.SerializeToString() \
+                if data_operation_spec is not None else None)
 
       elif isinstance(operation, operation_specs.WorkerRead):
         # A Read from an in-memory source is done over the data plane.
@@ -742,19 +755,23 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
               operation.source.source.default_output_coder())
           transform_spec = beam_runner_api_pb2.FunctionSpec(
               urn=bundle_processor.DATA_INPUT_URN,
-              parameter=proto_utils.pack_Any(data_operation_spec))
+              any_param=proto_utils.pack_Any(data_operation_spec),
+              payload=data_operation_spec.SerializeToString() \
+                  if data_operation_spec is not None else None)
 
         else:
           # Otherwise serialize the source and execute it there.
           # TODO: Use SDFs with an initial impulse.
           # The Dataflow runner harness strips the base64 encoding. do the same
           # here until we get the same thing back that we sent in.
+          source_bytes = base64.b64decode(
+              pickler.dumps(operation.source.source))
           transform_spec = beam_runner_api_pb2.FunctionSpec(
               urn=bundle_processor.PYTHON_SOURCE_URN,
-              parameter=proto_utils.pack_Any(
+              any_param=proto_utils.pack_Any(
                   wrappers_pb2.BytesValue(
-                      value=base64.b64decode(
-                          pickler.dumps(operation.source.source)))))
+                      value=source_bytes)),
+              payload=source_bytes)
 
       elif isinstance(operation, operation_specs.WorkerDoFn):
         # Record the contents of each side input for access via the state api.
@@ -773,8 +790,9 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
             (operation.serialized_fn, side_input_extras))
         transform_spec = beam_runner_api_pb2.FunctionSpec(
             urn=bundle_processor.PYTHON_DOFN_URN,
-            parameter=proto_utils.pack_Any(
-                wrappers_pb2.BytesValue(value=augmented_serialized_fn)))
+            any_param=proto_utils.pack_Any(
+                wrappers_pb2.BytesValue(value=augmented_serialized_fn)),
+            payload=augmented_serialized_fn)
 
       elif isinstance(operation, operation_specs.WorkerFlatten):
         # Flatten is nice and simple.

http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/sdks/python/apache_beam/runners/worker/bundle_processor.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py b/sdks/python/apache_beam/runners/worker/bundle_processor.py
index 9474eda..16c888c 100644
--- a/sdks/python/apache_beam/runners/worker/bundle_processor.py
+++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py
@@ -282,9 +282,9 @@ class BeamTransformFactory(object):
   def create_operation(self, transform_id, consumers):
     transform_proto = self.descriptor.transforms[transform_id]
     creator, parameter_type = self._known_urns[transform_proto.spec.urn]
-    parameter = proto_utils.unpack_Any(
-        transform_proto.spec.parameter, parameter_type)
-    return creator(self, transform_id, transform_proto, parameter, consumers)
+    payload = proto_utils.parse_Bytes(
+        transform_proto.spec.payload, parameter_type)
+    return creator(self, transform_id, transform_proto, payload, consumers)
 
   def get_coder(self, coder_id):
     coder_proto = self.descriptor.coders[coder_id]
@@ -293,9 +293,7 @@ class BeamTransformFactory(object):
     else:
       # No URN, assume cloud object encoding json bytes.
       return operation_specs.get_coder_from_spec(
-          json.loads(
-              proto_utils.unpack_Any(coder_proto.spec.spec.parameter,
-                                     wrappers_pb2.BytesValue).value))
+          json.loads(coder_proto.spec.spec.payload))
 
   def get_output_coders(self, transform_proto):
     return {
@@ -360,10 +358,10 @@ def create(factory, transform_id, transform_proto, grpc_port, consumers):
       data_channel=factory.data_channel_factory.create_data_channel(grpc_port))
 
 
-@BeamTransformFactory.register_urn(PYTHON_SOURCE_URN, wrappers_pb2.BytesValue)
+@BeamTransformFactory.register_urn(PYTHON_SOURCE_URN, None)
 def create(factory, transform_id, transform_proto, parameter, consumers):
   # The Dataflow runner harness strips the base64 encoding.
-  source = pickler.loads(base64.b64encode(parameter.value))
+  source = pickler.loads(base64.b64encode(parameter))
   spec = operation_specs.WorkerRead(
       iobase.SourceBundle(1.0, source, None, None),
       [WindowedValueCoder(source.default_output_coder())])
@@ -395,9 +393,9 @@ def create(factory, transform_id, transform_proto, parameter, consumers):
       consumers)
 
 
-@BeamTransformFactory.register_urn(PYTHON_DOFN_URN, wrappers_pb2.BytesValue)
+@BeamTransformFactory.register_urn(PYTHON_DOFN_URN, None)
 def create(factory, transform_id, transform_proto, parameter, consumers):
-  dofn_data = pickler.loads(parameter.value)
+  dofn_data = pickler.loads(parameter)
   if len(dofn_data) == 2:
     # Has side input data.
     serialized_fn, side_input_data = dofn_data
@@ -413,8 +411,7 @@ def create(factory, transform_id, transform_proto, parameter, consumers):
     urns.PARDO_TRANSFORM, beam_runner_api_pb2.ParDoPayload)
 def create(factory, transform_id, transform_proto, parameter, consumers):
   assert parameter.do_fn.spec.urn == urns.PICKLED_DO_FN_INFO
-  serialized_fn = proto_utils.unpack_Any(
-      parameter.do_fn.spec.parameter, wrappers_pb2.BytesValue).value
+  serialized_fn = parameter.do_fn.spec.payload
   dofn_data = pickler.loads(serialized_fn)
   if len(dofn_data) == 2:
     # Has side input data.

http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index 3f92ce9..9018a49 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -699,24 +699,24 @@ class ParDo(PTransformWithSideInputs):
 
   def to_runner_api_parameter(self, context):
     assert self.__class__ is ParDo
+    picked_pardo_fn_data = pickler.dumps(self._pardo_fn_data())
     return (
         urns.PARDO_TRANSFORM,
         beam_runner_api_pb2.ParDoPayload(
             do_fn=beam_runner_api_pb2.SdkFunctionSpec(
                 spec=beam_runner_api_pb2.FunctionSpec(
                     urn=urns.PICKLED_DO_FN_INFO,
-                    parameter=proto_utils.pack_Any(
+                    any_param=proto_utils.pack_Any(
                         wrappers_pb2.BytesValue(
-                            value=pickler.dumps(
-                                self._pardo_fn_data())))))))
+                            value=picked_pardo_fn_data)),
+                    payload=picked_pardo_fn_data))))
 
   @PTransform.register_urn(
       urns.PARDO_TRANSFORM, beam_runner_api_pb2.ParDoPayload)
   def from_runner_api_parameter(pardo_payload, context):
     assert pardo_payload.do_fn.spec.urn == urns.PICKLED_DO_FN_INFO
     fn, args, kwargs, si_tags_and_types, windowing = pickler.loads(
-        proto_utils.unpack_Any(
-            pardo_payload.do_fn.spec.parameter, wrappers_pb2.BytesValue).value)
+        pardo_payload.do_fn.spec.payload)
     if si_tags_and_types:
       raise NotImplementedError('deferred side inputs')
     elif windowing:

http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/sdks/python/apache_beam/transforms/ptransform.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py
index da113e0..a798fa1 100644
--- a/sdks/python/apache_beam/transforms/ptransform.py
+++ b/sdks/python/apache_beam/transforms/ptransform.py
@@ -442,7 +442,9 @@ class PTransform(WithTypeHints, HasDisplayData):
     urn, typed_param = self.to_runner_api_parameter(context)
     return beam_runner_api_pb2.FunctionSpec(
         urn=urn,
-        parameter=proto_utils.pack_Any(typed_param))
+        any_param=proto_utils.pack_Any(typed_param),
+        payload=typed_param.SerializeToString()
+        if typed_param is not None else None)
 
   @classmethod
   def from_runner_api(cls, proto, context):
@@ -450,7 +452,7 @@ class PTransform(WithTypeHints, HasDisplayData):
       return None
     parameter_type, constructor = cls._known_urns[proto.urn]
     return constructor(
-        proto_utils.unpack_Any(proto.parameter, parameter_type),
+        proto_utils.parse_Bytes(proto.payload, parameter_type),
         context)
 
   def to_runner_api_parameter(self, context):

http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/sdks/python/apache_beam/utils/proto_utils.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/proto_utils.py b/sdks/python/apache_beam/utils/proto_utils.py
index af8f218..d7693f3 100644
--- a/sdks/python/apache_beam/utils/proto_utils.py
+++ b/sdks/python/apache_beam/utils/proto_utils.py
@@ -46,6 +46,17 @@ def unpack_Any(any_msg, msg_class):
   return msg
 
 
+def parse_Bytes(bytes, msg_class):
+  """Parses the String of bytes into msg_class.
+
+  Returns the input bytes if msg_class is None."""
+  if msg_class is None:
+    return bytes
+  msg = msg_class()
+  msg.ParseFromString(bytes)
+  return msg
+
+
 def pack_Struct(**kwargs):
   """Returns a struct containing the values indicated by kwargs.
   """

http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/sdks/python/apache_beam/utils/urns.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/urns.py b/sdks/python/apache_beam/utils/urns.py
index 0013cb3..acf729f 100644
--- a/sdks/python/apache_beam/utils/urns.py
+++ b/sdks/python/apache_beam/utils/urns.py
@@ -120,7 +120,9 @@ class RunnerApiFn(object):
     return beam_runner_api_pb2.SdkFunctionSpec(
         spec=beam_runner_api_pb2.FunctionSpec(
             urn=urn,
-            parameter=proto_utils.pack_Any(typed_param)))
+            any_param=proto_utils.pack_Any(typed_param),
+            payload=typed_param.SerializeToString()
+            if typed_param is not None else None))
 
   @classmethod
   def from_runner_api(cls, fn_proto, context):
@@ -130,5 +132,5 @@ class RunnerApiFn(object):
     """
     parameter_type, constructor = cls._known_urns[fn_proto.spec.urn]
     return constructor(
-        proto_utils.unpack_Any(fn_proto.spec.parameter, parameter_type),
+        proto_utils.parse_Bytes(fn_proto.spec.payload, parameter_type),
         context)


[2/2] beam git commit: This closes #3193

Posted by tg...@apache.org.
This closes #3193


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9ed2cf41
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9ed2cf41
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9ed2cf41

Branch: refs/heads/master
Commit: 9ed2cf41f1f068b3b27e9a814d6f7ceed3406dbf
Parents: f5714f2 2b9b050
Author: Thomas Groh <tg...@google.com>
Authored: Tue Aug 8 18:19:00 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Aug 8 18:19:00 2017 -0700

----------------------------------------------------------------------
 .../core/construction/CoderTranslation.java     | 16 +---
 .../core/construction/CombineTranslation.java   | 28 ++----
 .../CreatePCollectionViewTranslation.java       | 17 +---
 .../construction/PTransformTranslation.java     | 12 +--
 .../core/construction/ParDoTranslation.java     | 58 +++++-------
 .../core/construction/PipelineTranslation.java  | 12 +--
 .../core/construction/ReadTranslation.java      | 56 +++++------
 .../construction/TestStreamTranslation.java     |  5 +-
 .../construction/WindowIntoTranslation.java     |  9 +-
 .../WindowingStrategyTranslation.java           | 97 ++++++++------------
 .../construction/WriteFilesTranslation.java     | 29 +++---
 .../CreatePCollectionViewTranslationTest.java   |  6 +-
 .../core/construction/ParDoTranslationTest.java |  3 +-
 .../construction/TestStreamTranslationTest.java |  4 +-
 .../src/main/proto/beam_runner_api.proto        |  5 +-
 .../beam/fn/harness/BeamFnDataReadRunner.java   |  4 +-
 .../beam/fn/harness/BeamFnDataWriteRunner.java  |  4 +-
 .../beam/fn/harness/BoundedSourceRunner.java    | 10 +-
 .../apache/beam/fn/harness/FnApiDoFnRunner.java | 10 +-
 .../fn/harness/BeamFnDataReadRunnerTest.java    |  5 +-
 .../fn/harness/BeamFnDataWriteRunnerTest.java   |  5 +-
 .../fn/harness/BoundedSourceRunnerTest.java     | 18 ++--
 .../beam/fn/harness/FnApiDoFnRunnerTest.java    | 13 +--
 sdks/python/apache_beam/coders/coders.py        | 11 +--
 .../runners/portability/fn_api_runner.py        | 78 ++++++++++------
 .../runners/worker/bundle_processor.py          | 21 ++---
 sdks/python/apache_beam/transforms/core.py      | 10 +-
 .../python/apache_beam/transforms/ptransform.py |  6 +-
 sdks/python/apache_beam/utils/proto_utils.py    | 11 +++
 sdks/python/apache_beam/utils/urns.py           |  6 +-
 30 files changed, 250 insertions(+), 319 deletions(-)
----------------------------------------------------------------------