You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/08/09 01:19:17 UTC
[1/2] beam git commit: Use bytes instead of Any in
RunnerApi.FunctionSpec
Repository: beam
Updated Branches:
refs/heads/master f5714f220 -> 9ed2cf41f
Use bytes instead of Any in RunnerApi.FunctionSpec
Keep a "any" field, renamed to any_param.
Rename `parameter` to `payload`
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2b9b0504
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2b9b0504
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2b9b0504
Branch: refs/heads/master
Commit: 2b9b05049822a22154ac3c2f6b593061f42b54c1
Parents: f5714f2
Author: Thomas Groh <tg...@google.com>
Authored: Mon Jun 5 11:22:56 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Aug 8 18:18:59 2017 -0700
----------------------------------------------------------------------
.../core/construction/CoderTranslation.java | 16 +---
.../core/construction/CombineTranslation.java | 28 ++----
.../CreatePCollectionViewTranslation.java | 17 +---
.../construction/PTransformTranslation.java | 12 +--
.../core/construction/ParDoTranslation.java | 58 +++++-------
.../core/construction/PipelineTranslation.java | 12 +--
.../core/construction/ReadTranslation.java | 56 +++++------
.../construction/TestStreamTranslation.java | 5 +-
.../construction/WindowIntoTranslation.java | 9 +-
.../WindowingStrategyTranslation.java | 97 ++++++++------------
.../construction/WriteFilesTranslation.java | 29 +++---
.../CreatePCollectionViewTranslationTest.java | 6 +-
.../core/construction/ParDoTranslationTest.java | 3 +-
.../construction/TestStreamTranslationTest.java | 4 +-
.../src/main/proto/beam_runner_api.proto | 5 +-
.../beam/fn/harness/BeamFnDataReadRunner.java | 4 +-
.../beam/fn/harness/BeamFnDataWriteRunner.java | 4 +-
.../beam/fn/harness/BoundedSourceRunner.java | 10 +-
.../apache/beam/fn/harness/FnApiDoFnRunner.java | 10 +-
.../fn/harness/BeamFnDataReadRunnerTest.java | 5 +-
.../fn/harness/BeamFnDataWriteRunnerTest.java | 5 +-
.../fn/harness/BoundedSourceRunnerTest.java | 18 ++--
.../beam/fn/harness/FnApiDoFnRunnerTest.java | 13 +--
sdks/python/apache_beam/coders/coders.py | 11 +--
.../runners/portability/fn_api_runner.py | 78 ++++++++++------
.../runners/worker/bundle_processor.py | 21 ++---
sdks/python/apache_beam/transforms/core.py | 10 +-
.../python/apache_beam/transforms/ptransform.py | 6 +-
sdks/python/apache_beam/utils/proto_utils.py | 11 +++
sdks/python/apache_beam/utils/urns.py | 6 +-
30 files changed, 250 insertions(+), 319 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslation.java
index a6719ff..2246f81 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslation.java
@@ -24,9 +24,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.BiMap;
import com.google.common.collect.ImmutableBiMap;
import com.google.common.collect.ImmutableMap;
-import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
-import com.google.protobuf.BytesValue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
@@ -138,13 +136,9 @@ public class CoderTranslation {
.setSpec(
FunctionSpec.newBuilder()
.setUrn(JAVA_SERIALIZED_CODER_URN)
- .setParameter(
- Any.pack(
- BytesValue.newBuilder()
- .setValue(
- ByteString.copyFrom(
- SerializableUtils.serializeToByteArray(coder)))
- .build()))))
+ .setPayload(
+ ByteString.copyFrom(SerializableUtils.serializeToByteArray(coder)))
+ .build()))
.build();
}
@@ -182,9 +176,7 @@ public class CoderTranslation {
protoCoder
.getSpec()
.getSpec()
- .getParameter()
- .unpack(BytesValue.class)
- .getValue()
+ .getPayload()
.toByteArray(),
"Custom Coder Bytes");
}
http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java
index d909ccf..17c48dc 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java
@@ -23,9 +23,7 @@ import static org.apache.beam.runners.core.construction.PTransformTranslation.CO
import com.google.auto.service.AutoService;
import com.google.common.collect.Iterables;
-import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
-import com.google.protobuf.BytesValue;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
@@ -79,7 +77,7 @@ public class CombineTranslation {
CombinePayload payload = toProto(transform, components);
return RunnerApi.FunctionSpec.newBuilder()
.setUrn(COMBINE_TRANSFORM_URN)
- .setParameter(Any.pack(payload))
+ .setPayload(payload.toByteString())
.build();
}
@@ -138,13 +136,9 @@ public class CombineTranslation {
.setSpec(
FunctionSpec.newBuilder()
.setUrn(JAVA_SERIALIZED_COMBINE_FN_URN)
- .setParameter(
- Any.pack(
- BytesValue.newBuilder()
- .setValue(
- ByteString.copyFrom(
- SerializableUtils.serializeToByteArray(combineFn)))
- .build())))
+ .setPayload(
+ ByteString.copyFrom(SerializableUtils.serializeToByteArray(combineFn)))
+ .build())
.build();
}
@@ -171,9 +165,7 @@ public class CombineTranslation {
payload
.getCombineFn()
.getSpec()
- .getParameter()
- .unpack(BytesValue.class)
- .getValue()
+ .getPayload()
.toByteArray(),
"CombineFn");
}
@@ -190,10 +182,10 @@ public class CombineTranslation {
private static CombinePayload getCombinePayload(
AppliedPTransform<?, ?, ?> transform, SdkComponents components) throws IOException {
- return PTransformTranslation.toProto(
- transform, Collections.<AppliedPTransform<?, ?, ?>>emptyList(), components)
- .getSpec()
- .getParameter()
- .unpack(CombinePayload.class);
+ return CombinePayload.parseFrom(
+ PTransformTranslation.toProto(
+ transform, Collections.<AppliedPTransform<?, ?, ?>>emptyList(), components)
+ .getSpec()
+ .getPayload());
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java
index c67d688..1027ea2 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java
@@ -21,9 +21,7 @@ package org.apache.beam.runners.core.construction;
import static com.google.common.base.Preconditions.checkArgument;
import com.google.auto.service.AutoService;
-import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
-import com.google.protobuf.BytesValue;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
@@ -79,9 +77,7 @@ public class CreatePCollectionViewTranslation {
SerializableUtils.deserializeFromByteArray(
transformProto
.getSpec()
- .getParameter()
- .unpack(BytesValue.class)
- .getValue()
+ .getPayload()
.toByteArray(),
PCollectionView.class.getSimpleName());
}
@@ -104,14 +100,9 @@ public class CreatePCollectionViewTranslation {
SdkComponents components) {
return FunctionSpec.newBuilder()
.setUrn(getUrn(transform.getTransform()))
- .setParameter(
- Any.pack(
- BytesValue.newBuilder()
- .setValue(
- ByteString.copyFrom(
- SerializableUtils.serializeToByteArray(
- transform.getTransform().getView())))
- .build()))
+ .setPayload(
+ ByteString.copyFrom(
+ SerializableUtils.serializeToByteArray(transform.getTransform().getView())))
.build();
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
index b8365c9..4bfe17a 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
@@ -21,7 +21,7 @@ package org.apache.beam.runners.core.construction;
import static com.google.common.base.Preconditions.checkArgument;
import com.google.common.collect.ImmutableMap;
-import com.google.protobuf.Any;
+import com.google.protobuf.ByteString;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
@@ -131,9 +131,9 @@ public class PTransformTranslation {
if (rawPTransform.getUrn() != null) {
FunctionSpec.Builder payload = FunctionSpec.newBuilder().setUrn(rawPTransform.getUrn());
- @Nullable Any parameter = rawPTransform.getPayload();
+ @Nullable ByteString parameter = rawPTransform.getPayload();
if (parameter != null) {
- payload.setParameter(parameter);
+ payload.setPayload(parameter);
}
transformBuilder.setSpec(payload);
}
@@ -224,7 +224,7 @@ public class PTransformTranslation {
public abstract String getUrn();
@Nullable
- public Any getPayload() {
+ public ByteString getPayload() {
return null;
}
@@ -254,9 +254,9 @@ public class PTransformTranslation {
FunctionSpec.Builder transformSpec =
FunctionSpec.newBuilder().setUrn(getUrn(transform.getTransform()));
- Any payload = transform.getTransform().getPayload();
+ ByteString payload = transform.getTransform().getPayload();
if (payload != null) {
- transformSpec.setParameter(payload);
+ transformSpec.setPayload(payload);
}
// Transforms like Combine may have Coders that need to be added but do not
http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
index 5765c51..6ae95e4 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
@@ -29,9 +29,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
-import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
-import com.google.protobuf.BytesValue;
import com.google.protobuf.InvalidProtocolBufferException;
import java.io.IOException;
import java.io.Serializable;
@@ -122,7 +120,7 @@ public class ParDoTranslation {
ParDoPayload payload = toProto(transform.getTransform(), components);
return RunnerApi.FunctionSpec.newBuilder()
.setUrn(PAR_DO_TRANSFORM_URN)
- .setParameter(Any.pack(payload))
+ .setPayload(payload.toByteString())
.build();
}
@@ -240,7 +238,7 @@ public class ParDoTranslation {
RunnerApi.PTransform protoTransform =
PTransformTranslation.toProto(application, SdkComponents.create());
- ParDoPayload payload = protoTransform.getSpec().getParameter().unpack(ParDoPayload.class);
+ ParDoPayload payload = ParDoPayload.parseFrom(protoTransform.getSpec().getPayload());
TupleTag<?> mainOutputTag = getMainOutputTag(payload);
Set<String> outputTags =
Sets.difference(
@@ -259,7 +257,7 @@ public class ParDoTranslation {
SdkComponents sdkComponents = SdkComponents.create();
RunnerApi.PTransform parDoProto =
PTransformTranslation.toProto(application, sdkComponents);
- ParDoPayload payload = parDoProto.getSpec().getParameter().unpack(ParDoPayload.class);
+ ParDoPayload payload = ParDoPayload.parseFrom(parDoProto.getSpec().getPayload());
List<PCollectionView<?>> views = new ArrayList<>();
RehydratedComponents components =
@@ -289,7 +287,7 @@ public class ParDoTranslation {
ptransform.getSpec().getUrn().equals(PAR_DO_TRANSFORM_URN),
"Unexpected payload type %s",
ptransform.getSpec().getUrn());
- ParDoPayload payload = ptransform.getSpec().getParameter().unpack(ParDoPayload.class);
+ ParDoPayload payload = ParDoPayload.parseFrom(ptransform.getSpec().getPayload());
String mainInputId =
Iterables.getOnlyElement(
Sets.difference(
@@ -377,7 +375,7 @@ public class ParDoTranslation {
Combine.CombineFn<?, ?, ?> combineFn =
(Combine.CombineFn<?, ?, ?>)
SerializableUtils.deserializeFromByteArray(
- combineFnSpec.getParameter().unpack(BytesValue.class).toByteArray(),
+ combineFnSpec.getPayload().toByteArray(),
Combine.CombineFn.class.getSimpleName());
// Rawtype coder cast because it is required to be a valid accumulator coder
@@ -443,14 +441,10 @@ public class ParDoTranslation {
.setSpec(
FunctionSpec.newBuilder()
.setUrn(CUSTOM_JAVA_DO_FN_URN)
- .setParameter(
- Any.pack(
- BytesValue.newBuilder()
- .setValue(
- ByteString.copyFrom(
- SerializableUtils.serializeToByteArray(
- DoFnAndMainOutput.of(fn, tag))))
- .build())))
+ .setPayload(
+ ByteString.copyFrom(
+ SerializableUtils.serializeToByteArray(DoFnAndMainOutput.of(fn, tag))))
+ .build())
.build();
}
@@ -458,7 +452,7 @@ public class ParDoTranslation {
throws InvalidProtocolBufferException {
checkArgument(fnSpec.getSpec().getUrn().equals(CUSTOM_JAVA_DO_FN_URN));
byte[] serializedFn =
- fnSpec.getSpec().getParameter().unpack(BytesValue.class).getValue().toByteArray();
+ fnSpec.getSpec().getPayload().toByteArray();
return (DoFnAndMainOutput)
SerializableUtils.deserializeFromByteArray(serializedFn, "Custom DoFn And Main Output tag");
}
@@ -542,22 +536,17 @@ public class ParDoTranslation {
.setSpec(
FunctionSpec.newBuilder()
.setUrn(CUSTOM_JAVA_VIEW_FN_URN)
- .setParameter(
- Any.pack(
- BytesValue.newBuilder()
- .setValue(
- ByteString.copyFrom(SerializableUtils.serializeToByteArray(viewFn)))
- .build())))
+ .setPayload(ByteString.copyFrom(SerializableUtils.serializeToByteArray(viewFn)))
+ .build())
.build();
}
private static <T> ParDoPayload getParDoPayload(AppliedPTransform<?, ?, ?> transform)
throws IOException {
- return PTransformTranslation.toProto(
- transform, Collections.<AppliedPTransform<?, ?, ?>>emptyList(), SdkComponents.create())
- .getSpec()
- .getParameter()
- .unpack(ParDoPayload.class);
+ RunnerApi.PTransform parDoPTransform =
+ PTransformTranslation.toProto(
+ transform, Collections.<AppliedPTransform<?, ?, ?>>emptyList(), SdkComponents.create());
+ return ParDoPayload.parseFrom(parDoPTransform.getSpec().getPayload());
}
public static boolean usesStateOrTimers(AppliedPTransform<?, ?, ?> transform) throws IOException {
@@ -580,7 +569,7 @@ public class ParDoTranslation {
spec.getUrn());
return (ViewFn<?, ?>)
SerializableUtils.deserializeFromByteArray(
- spec.getParameter().unpack(BytesValue.class).getValue().toByteArray(), "Custom ViewFn");
+ spec.getPayload().toByteArray(), "Custom ViewFn");
}
private static SdkFunctionSpec toProto(WindowMappingFn<?> windowMappingFn) {
@@ -588,13 +577,9 @@ public class ParDoTranslation {
.setSpec(
FunctionSpec.newBuilder()
.setUrn(CUSTOM_JAVA_WINDOW_MAPPING_FN_URN)
- .setParameter(
- Any.pack(
- BytesValue.newBuilder()
- .setValue(
- ByteString.copyFrom(
- SerializableUtils.serializeToByteArray(windowMappingFn)))
- .build())))
+ .setPayload(
+ ByteString.copyFrom(SerializableUtils.serializeToByteArray(windowMappingFn)))
+ .build())
.build();
}
@@ -608,7 +593,6 @@ public class ParDoTranslation {
spec.getUrn());
return (WindowMappingFn<?>)
SerializableUtils.deserializeFromByteArray(
- spec.getParameter().unpack(BytesValue.class).getValue().toByteArray(),
- "Custom WinodwMappingFn");
+ spec.getPayload().toByteArray(), "Custom WinodwMappingFn");
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
index 9e4839a..d928338 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
@@ -24,7 +24,7 @@ import com.google.auto.value.AutoValue;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ListMultimap;
-import com.google.protobuf.Any;
+import com.google.protobuf.ByteString;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
@@ -158,7 +158,7 @@ public class PipelineTranslation {
// TODO: ParDoTranslator should own it - https://issues.apache.org/jira/browse/BEAM-2674
if (transformSpec.getUrn().equals(PTransformTranslation.PAR_DO_TRANSFORM_URN)) {
RunnerApi.ParDoPayload payload =
- transformSpec.getParameter().unpack(RunnerApi.ParDoPayload.class);
+ RunnerApi.ParDoPayload.parseFrom(transformSpec.getPayload());
List<PCollectionView<?>> views = new ArrayList<>();
for (Map.Entry<String, RunnerApi.SideInput> sideInputEntry :
@@ -182,7 +182,7 @@ public class PipelineTranslation {
List<Coder<?>> additionalCoders = Collections.emptyList();
if (transformSpec.getUrn().equals(PTransformTranslation.COMBINE_TRANSFORM_URN)) {
RunnerApi.CombinePayload payload =
- transformSpec.getParameter().unpack(RunnerApi.CombinePayload.class);
+ RunnerApi.CombinePayload.parseFrom(transformSpec.getPayload());
additionalCoders =
(List)
Collections.singletonList(
@@ -192,7 +192,7 @@ public class PipelineTranslation {
RehydratedPTransform transform =
RehydratedPTransform.of(
transformSpec.getUrn(),
- transformSpec.getParameter(),
+ transformSpec.getPayload(),
additionalInputs,
additionalCoders);
@@ -233,7 +233,7 @@ public class PipelineTranslation {
public abstract String getUrn();
@Nullable
- public abstract Any getPayload();
+ public abstract ByteString getPayload();
@Override
public abstract Map<TupleTag<?>, PValue> getAdditionalInputs();
@@ -242,7 +242,7 @@ public class PipelineTranslation {
public static RehydratedPTransform of(
String urn,
- Any payload,
+ ByteString payload,
Map<TupleTag<?>, PValue> additionalInputs,
List<Coder<?>> additionalCoders) {
return new AutoValue_PipelineTranslation_RehydratedPTransform(
http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java
index 572384b..06d1074 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java
@@ -22,9 +22,7 @@ import static com.google.common.base.Preconditions.checkArgument;
import com.google.auto.service.AutoService;
import com.google.common.collect.ImmutableMap;
-import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
-import com.google.protobuf.BytesValue;
import com.google.protobuf.InvalidProtocolBufferException;
import java.io.IOException;
import java.util.Collections;
@@ -83,12 +81,8 @@ public class ReadTranslation {
.setSpec(
FunctionSpec.newBuilder()
.setUrn(JAVA_SERIALIZED_BOUNDED_SOURCE)
- .setParameter(
- Any.pack(
- BytesValue.newBuilder()
- .setValue(
- ByteString.copyFrom(SerializableUtils.serializeToByteArray(source)))
- .build())))
+ .setPayload(ByteString.copyFrom(SerializableUtils.serializeToByteArray(source)))
+ .build())
.build();
}
@@ -99,9 +93,7 @@ public class ReadTranslation {
payload
.getSource()
.getSpec()
- .getParameter()
- .unpack(BytesValue.class)
- .getValue()
+ .getPayload()
.toByteArray(),
"BoundedSource");
}
@@ -122,11 +114,13 @@ public class ReadTranslation {
private static <T> ReadPayload getReadPayload(
AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>> transform)
throws IOException {
- return PTransformTranslation.toProto(
- transform, Collections.<AppliedPTransform<?, ?, ?>>emptyList(), SdkComponents.create())
- .getSpec()
- .getParameter()
- .unpack(ReadPayload.class);
+ return ReadPayload.parseFrom(
+ PTransformTranslation.toProto(
+ transform,
+ Collections.<AppliedPTransform<?, ?, ?>>emptyList(),
+ SdkComponents.create())
+ .getSpec()
+ .getPayload());
}
private static SdkFunctionSpec toProto(UnboundedSource<?, ?> source) {
@@ -134,12 +128,8 @@ public class ReadTranslation {
.setSpec(
FunctionSpec.newBuilder()
.setUrn(JAVA_SERIALIZED_UNBOUNDED_SOURCE)
- .setParameter(
- Any.pack(
- BytesValue.newBuilder()
- .setValue(
- ByteString.copyFrom(SerializableUtils.serializeToByteArray(source)))
- .build())))
+ .setPayload(ByteString.copyFrom(SerializableUtils.serializeToByteArray(source)))
+ .build())
.build();
}
@@ -150,9 +140,7 @@ public class ReadTranslation {
payload
.getSource()
.getSpec()
- .getParameter()
- .unpack(BytesValue.class)
- .getValue()
+ .getPayload()
.toByteArray(),
"BoundedSource");
}
@@ -160,13 +148,13 @@ public class ReadTranslation {
public static PCollection.IsBounded sourceIsBounded(AppliedPTransform<?, ?, ?> transform) {
try {
return PCollectionTranslation.fromProto(
- PTransformTranslation.toProto(
- transform,
- Collections.<AppliedPTransform<?, ?, ?>>emptyList(),
- SdkComponents.create())
- .getSpec()
- .getParameter()
- .unpack(ReadPayload.class)
+ ReadPayload.parseFrom(
+ PTransformTranslation.toProto(
+ transform,
+ Collections.<AppliedPTransform<?, ?, ?>>emptyList(),
+ SdkComponents.create())
+ .getSpec()
+ .getPayload())
.getIsBounded());
} catch (IOException e) {
throw new RuntimeException("Internal error determining boundedness of Read", e);
@@ -195,7 +183,7 @@ public class ReadTranslation {
ReadPayload payload = toProto(transform.getTransform());
return RunnerApi.FunctionSpec.newBuilder()
.setUrn(getUrn(transform.getTransform()))
- .setParameter(Any.pack(payload))
+ .setPayload(payload.toByteString())
.build();
}
}
@@ -222,7 +210,7 @@ public class ReadTranslation {
ReadPayload payload = toProto(transform.getTransform());
return RunnerApi.FunctionSpec.newBuilder()
.setUrn(getUrn(transform.getTransform()))
- .setParameter(Any.pack(payload))
+ .setPayload(payload.toByteString())
.build();
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java
index f23b2ec..cac7cdc 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java
@@ -22,7 +22,6 @@ import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.runners.core.construction.PTransformTranslation.TEST_STREAM_TRANSFORM_URN;
import com.google.auto.service.AutoService;
-import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.util.ArrayList;
@@ -96,7 +95,7 @@ public class TestStreamTranslation {
TestStream.class.getSimpleName(),
transformProto.getSpec().getUrn());
RunnerApi.TestStreamPayload testStreamPayload =
- transformProto.getSpec().getParameter().unpack(RunnerApi.TestStreamPayload.class);
+ RunnerApi.TestStreamPayload.parseFrom(transformProto.getSpec().getPayload());
return (TestStream<T>)
fromProto(
@@ -185,7 +184,7 @@ public class TestStreamTranslation {
throws IOException {
return RunnerApi.FunctionSpec.newBuilder()
.setUrn(getUrn(transform.getTransform()))
- .setParameter(Any.pack(testStreamToPayload(transform.getTransform(), components)))
+ .setPayload(testStreamToPayload(transform.getTransform(), components).toByteString())
.build();
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java
index 6aec908..94ef22d 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java
@@ -21,7 +21,6 @@ package org.apache.beam.runners.core.construction;
import static com.google.common.base.Preconditions.checkArgument;
import com.google.auto.service.AutoService;
-import com.google.protobuf.Any;
import com.google.protobuf.InvalidProtocolBufferException;
import java.io.IOException;
import java.util.Collections;
@@ -54,8 +53,8 @@ public class WindowIntoTranslation {
AppliedPTransform<?, ?, Window.Assign<?>> transform, SdkComponents components) {
return FunctionSpec.newBuilder()
.setUrn("urn:beam:transform:window:v1")
- .setParameter(
- Any.pack(WindowIntoTranslation.toProto(transform.getTransform(), components)))
+ .setPayload(
+ WindowIntoTranslation.toProto(transform.getTransform(), components).toByteString())
.build();
}
}
@@ -88,7 +87,7 @@ public class WindowIntoTranslation {
WindowIntoPayload windowIntoPayload;
try {
- return transformProto.getSpec().getParameter().unpack(WindowIntoPayload.class);
+ return WindowIntoPayload.parseFrom(transformProto.getSpec().getPayload());
} catch (InvalidProtocolBufferException exc) {
throw new IllegalStateException(
String.format(
@@ -128,7 +127,7 @@ public class WindowIntoTranslation {
WindowIntoPayload payload = toProto(transform.getTransform(), components);
return RunnerApi.FunctionSpec.newBuilder()
.setUrn(getUrn(transform.getTransform()))
- .setParameter(Any.pack(payload))
+ .setPayload(payload.toByteString())
.build();
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
index 565b552..ab50ea2 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
@@ -31,6 +31,9 @@ import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec;
import org.apache.beam.sdk.common.runner.v1.RunnerApi.OutputTime;
import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec;
import org.apache.beam.sdk.common.runner.v1.StandardWindowFns;
+import org.apache.beam.sdk.common.runner.v1.StandardWindowFns.FixedWindowsPayload;
+import org.apache.beam.sdk.common.runner.v1.StandardWindowFns.SessionsPayload;
+import org.apache.beam.sdk.common.runner.v1.StandardWindowFns.SlidingWindowsPayload;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Sessions;
@@ -199,77 +202,65 @@ public class WindowingStrategyTranslation implements Serializable {
public static SdkFunctionSpec toProto(
WindowFn<?, ?> windowFn, @SuppressWarnings("unused") SdkComponents components) {
// TODO: Set environment IDs
+ ByteString serializedFn = ByteString.copyFrom(SerializableUtils.serializeToByteArray(windowFn));
if (USE_OLD_SERIALIZED_JAVA_WINDOWFN_URN) {
return SdkFunctionSpec.newBuilder()
.setSpec(
FunctionSpec.newBuilder()
.setUrn(OLD_SERIALIZED_JAVA_WINDOWFN_URN)
- .setParameter(
- Any.pack(
- BytesValue.newBuilder()
- .setValue(
- ByteString.copyFrom(
- SerializableUtils.serializeToByteArray(windowFn)))
- .build())))
+ .setAnyParam(Any.pack(BytesValue.newBuilder().setValue(serializedFn).build()))
+ .setPayload(serializedFn)
+ .build())
.build();
} else if (windowFn instanceof GlobalWindows) {
return SdkFunctionSpec.newBuilder()
.setSpec(FunctionSpec.newBuilder().setUrn(GLOBAL_WINDOWS_FN))
.build();
} else if (windowFn instanceof FixedWindows) {
+ FixedWindowsPayload fixedWindowsPayload =
+ FixedWindowsPayload.newBuilder()
+ .setSize(Durations.fromMillis(((FixedWindows) windowFn).getSize().getMillis()))
+ .setOffset(Timestamps.fromMillis(((FixedWindows) windowFn).getOffset().getMillis()))
+ .build();
return SdkFunctionSpec.newBuilder()
.setSpec(
FunctionSpec.newBuilder()
.setUrn(FIXED_WINDOWS_FN)
- .setParameter(
- Any.pack(
- StandardWindowFns.FixedWindowsPayload.newBuilder()
- .setSize(Durations.fromMillis(
- ((FixedWindows) windowFn).getSize().getMillis()))
- .setOffset(Timestamps.fromMillis(
- ((FixedWindows) windowFn).getOffset().getMillis()))
- .build())))
+ .setAnyParam(Any.pack(fixedWindowsPayload))
+ .setPayload(fixedWindowsPayload.toByteString()))
.build();
} else if (windowFn instanceof SlidingWindows) {
+ SlidingWindowsPayload slidingWindowsPayload = SlidingWindowsPayload.newBuilder()
+ .setSize(Durations.fromMillis(((SlidingWindows) windowFn).getSize().getMillis()))
+ .setOffset(Timestamps.fromMillis(((SlidingWindows) windowFn).getOffset().getMillis()))
+ .setPeriod(Durations.fromMillis(((SlidingWindows) windowFn).getPeriod().getMillis()))
+ .build();
return SdkFunctionSpec.newBuilder()
.setSpec(
FunctionSpec.newBuilder()
.setUrn(SLIDING_WINDOWS_FN)
- .setParameter(
- Any.pack(
- StandardWindowFns.SlidingWindowsPayload.newBuilder()
- .setSize(Durations.fromMillis(
- ((SlidingWindows) windowFn).getSize().getMillis()))
- .setOffset(Timestamps.fromMillis(
- ((SlidingWindows) windowFn).getOffset().getMillis()))
- .setPeriod(Durations.fromMillis(
- ((SlidingWindows) windowFn).getPeriod().getMillis()))
- .build())))
+ .setAnyParam(Any.pack(slidingWindowsPayload))
+ .setPayload(slidingWindowsPayload.toByteString()))
.build();
} else if (windowFn instanceof Sessions) {
+ SessionsPayload sessionsPayload =
+ SessionsPayload.newBuilder()
+ .setGapSize(Durations.fromMillis(((Sessions) windowFn).getGapDuration().getMillis()))
+ .build();
return SdkFunctionSpec.newBuilder()
.setSpec(
FunctionSpec.newBuilder()
.setUrn(SESSION_WINDOWS_FN)
- .setParameter(
- Any.pack(
- StandardWindowFns.SessionsPayload.newBuilder()
- .setGapSize(Durations.fromMillis(
- ((Sessions) windowFn).getGapDuration().getMillis()))
- .build())))
+ .setAnyParam(Any.pack(sessionsPayload))
+ .setPayload(sessionsPayload.toByteString()))
.build();
} else {
return SdkFunctionSpec.newBuilder()
.setSpec(
FunctionSpec.newBuilder()
.setUrn(SERIALIZED_JAVA_WINDOWFN_URN)
- .setParameter(
- Any.pack(
- BytesValue.newBuilder()
- .setValue(
- ByteString.copyFrom(
- SerializableUtils.serializeToByteArray(windowFn)))
- .build())))
+ .setAnyParam(Any.pack(BytesValue.newBuilder().setValue(serializedFn).build()))
+ .setPayload(serializedFn))
.build();
}
}
@@ -365,49 +356,41 @@ public class WindowingStrategyTranslation implements Serializable {
case GLOBAL_WINDOWS_FN:
return new GlobalWindows();
case FIXED_WINDOWS_FN:
- StandardWindowFns.FixedWindowsPayload fixedParams =
- windowFnSpec
- .getSpec()
- .getParameter()
- .unpack(StandardWindowFns.FixedWindowsPayload.class);
+ StandardWindowFns.FixedWindowsPayload fixedParams = null;
+ fixedParams =
+ StandardWindowFns.FixedWindowsPayload.parseFrom(
+ windowFnSpec.getSpec().getPayload());
return FixedWindows.of(Duration.millis(Durations.toMillis(fixedParams.getSize())))
.withOffset(Duration.millis(Timestamps.toMillis(fixedParams.getOffset())));
case SLIDING_WINDOWS_FN:
StandardWindowFns.SlidingWindowsPayload slidingParams =
- windowFnSpec
- .getSpec()
- .getParameter()
- .unpack(StandardWindowFns.SlidingWindowsPayload.class);
+ StandardWindowFns.SlidingWindowsPayload.parseFrom(
+ windowFnSpec.getSpec().getPayload());
return SlidingWindows.of(Duration.millis(Durations.toMillis(slidingParams.getSize())))
.every(Duration.millis(Durations.toMillis(slidingParams.getPeriod())))
.withOffset(Duration.millis(Timestamps.toMillis(slidingParams.getOffset())));
case SESSION_WINDOWS_FN:
StandardWindowFns.SessionsPayload sessionParams =
- windowFnSpec.getSpec().getParameter().unpack(StandardWindowFns.SessionsPayload.class);
+ StandardWindowFns.SessionsPayload.parseFrom(windowFnSpec.getSpec().getPayload());
return Sessions.withGapDuration(
Duration.millis(Durations.toMillis(sessionParams.getGapSize())));
case SERIALIZED_JAVA_WINDOWFN_URN:
case OLD_SERIALIZED_JAVA_WINDOWFN_URN:
return (WindowFn<?, ?>)
SerializableUtils.deserializeFromByteArray(
- windowFnSpec
- .getSpec()
- .getParameter()
- .unpack(BytesValue.class)
- .getValue()
- .toByteArray(),
- "WindowFn");
+ windowFnSpec.getSpec().getPayload().toByteArray(), "WindowFn");
default:
throw new IllegalArgumentException(
"Unknown or unsupported WindowFn: " + windowFnSpec.getSpec().getUrn());
}
- } catch (InvalidProtocolBufferException exc) {
+ } catch (InvalidProtocolBufferException e) {
throw new IllegalArgumentException(
String.format(
"%s for %s with URN %s did not contain expected proto message for payload",
FunctionSpec.class.getSimpleName(),
WindowFn.class.getSimpleName(),
- windowFnSpec.getSpec().getUrn()));
+ windowFnSpec.getSpec().getUrn()),
+ e);
}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
index 7954b0e..aeefd4f 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
@@ -25,9 +25,7 @@ import com.google.auto.service.AutoService;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
-import com.google.protobuf.BytesValue;
import java.io.IOException;
import java.io.Serializable;
import java.util.Collections;
@@ -83,13 +81,9 @@ public class WriteFilesTranslation {
.setSpec(
FunctionSpec.newBuilder()
.setUrn(urn)
- .setParameter(
- Any.pack(
- BytesValue.newBuilder()
- .setValue(
- ByteString.copyFrom(
- SerializableUtils.serializeToByteArray(serializable)))
- .build())))
+ .setPayload(
+ ByteString.copyFrom(SerializableUtils.serializeToByteArray(serializable)))
+ .build())
.build();
}
@@ -102,8 +96,7 @@ public class WriteFilesTranslation {
FunctionSpec.class.getSimpleName(),
sinkProto.getSpec().getUrn());
- byte[] serializedSink =
- sinkProto.getSpec().getParameter().unpack(BytesValue.class).getValue().toByteArray();
+ byte[] serializedSink = sinkProto.getSpec().getPayload().toByteArray();
return (FileBasedSink<?, ?, ?>)
SerializableUtils.deserializeFromByteArray(
@@ -163,11 +156,13 @@ public class WriteFilesTranslation {
AppliedPTransform<PCollection<T>, PDone, ? extends PTransform<PCollection<T>, PDone>>
transform)
throws IOException {
- return PTransformTranslation.toProto(
- transform, Collections.<AppliedPTransform<?, ?, ?>>emptyList(), SdkComponents.create())
- .getSpec()
- .getParameter()
- .unpack(WriteFilesPayload.class);
+ return WriteFilesPayload.parseFrom(
+ PTransformTranslation.toProto(
+ transform,
+ Collections.<AppliedPTransform<?, ?, ?>>emptyList(),
+ SdkComponents.create())
+ .getSpec()
+ .getPayload());
}
static class WriteFilesTranslator implements TransformPayloadTranslator<WriteFiles<?, ?, ?>> {
@@ -181,7 +176,7 @@ public class WriteFilesTranslation {
AppliedPTransform<?, ?, WriteFiles<?, ?, ?>> transform, SdkComponents components) {
return FunctionSpec.newBuilder()
.setUrn(getUrn(transform.getTransform()))
- .setParameter(Any.pack(toProto(transform.getTransform())))
+ .setPayload(toProto(transform.getTransform()).toByteString())
.build();
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslationTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslationTest.java
index 0d209a0..4f57af8 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslationTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslationTest.java
@@ -21,7 +21,6 @@ package org.apache.beam.runners.core.construction;
import static org.junit.Assert.assertThat;
import com.google.common.collect.ImmutableList;
-import com.google.protobuf.BytesValue;
import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec;
import org.apache.beam.sdk.common.runner.v1.RunnerApi.ParDoPayload;
import org.apache.beam.sdk.runners.AppliedPTransform;
@@ -98,8 +97,7 @@ public class CreatePCollectionViewTranslationTest {
PCollectionView<?> deserializedView =
(PCollectionView<?>)
SerializableUtils.deserializeFromByteArray(
- payload.getParameter().unpack(BytesValue.class).getValue().toByteArray(),
- PCollectionView.class.getSimpleName());
+ payload.getPayload().toByteArray(), PCollectionView.class.getSimpleName());
assertThat(
deserializedView, Matchers.<PCollectionView<?>>equalTo(createViewTransform.getView()));
@@ -126,7 +124,7 @@ public class CreatePCollectionViewTranslationTest {
PCollectionView<?> deserializedView =
(PCollectionView<?>)
SerializableUtils.deserializeFromByteArray(
- payload.getParameter().unpack(BytesValue.class).getValue().toByteArray(),
+ payload.getPayload().toByteArray(),
PCollectionView.class.getSimpleName());
assertThat(
http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java
index c31e803..680f940 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java
@@ -156,8 +156,7 @@ public class ParDoTranslationTest {
// Decode
Pipeline rehydratedPipeline = Pipeline.create();
- ParDoPayload parDoPayload =
- protoTransform.getSpec().getParameter().unpack(ParDoPayload.class);
+ ParDoPayload parDoPayload = ParDoPayload.parseFrom(protoTransform.getSpec().getPayload());
for (PCollectionView<?> view : parDo.getSideInputs()) {
SideInput sideInput = parDoPayload.getSideInputsOrThrow(view.getTagInternal().getId());
PCollectionView<?> restoredView =
http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TestStreamTranslationTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TestStreamTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TestStreamTranslationTest.java
index e4336df..893f4b9 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TestStreamTranslationTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TestStreamTranslationTest.java
@@ -29,6 +29,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.common.runner.v1.RunnerApi;
import org.apache.beam.sdk.common.runner.v1.RunnerApi.ParDoPayload;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.TestStreamPayload;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestStream;
@@ -100,8 +101,7 @@ public class TestStreamTranslationTest {
assertThat(spec.getUrn(), equalTo(TEST_STREAM_TRANSFORM_URN));
- RunnerApi.TestStreamPayload payload =
- spec.getParameter().unpack(RunnerApi.TestStreamPayload.class);
+ RunnerApi.TestStreamPayload payload = TestStreamPayload.parseFrom(spec.getPayload());
verifyTestStreamEncoding(
testStream, payload, RehydratedComponents.forComponents(components.toComponents()));
http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
----------------------------------------------------------------------
diff --git a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
index 9afb565..fb5d47e 100644
--- a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
+++ b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
@@ -755,9 +755,12 @@ message FunctionSpec {
// passed as-is.
string urn = 1;
+ // (Deprecated)
+ google.protobuf.Any any_param = 2;
+
// (Optional) The data specifying any parameters to the URN. If
// the URN does not require any arguments, this may be omitted.
- google.protobuf.Any parameter = 2;
+ bytes payload = 3;
}
// TODO: transfer javadoc here
http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java
index 1e611db..df0e5a2 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java
@@ -129,8 +129,8 @@ public class BeamFnDataReadRunner<OutputT> {
BeamFnDataClient beamFnDataClientFactory,
Collection<ThrowingConsumer<WindowedValue<OutputT>>> consumers)
throws IOException {
- this.apiServiceDescriptor = functionSpec.getParameter().unpack(BeamFnApi.RemoteGrpcPort.class)
- .getApiServiceDescriptor();
+ this.apiServiceDescriptor =
+ BeamFnApi.RemoteGrpcPort.parseFrom(functionSpec.getPayload()).getApiServiceDescriptor();
this.inputTarget = inputTarget;
this.processBundleInstructionIdSupplier = processBundleInstructionIdSupplier;
this.beamFnDataClientFactory = beamFnDataClientFactory;
http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java
index bbed753..48b450a 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java
@@ -120,8 +120,8 @@ public class BeamFnDataWriteRunner<InputT> {
Map<String, RunnerApi.Coder> coders,
BeamFnDataClient beamFnDataClientFactory)
throws IOException {
- this.apiServiceDescriptor = functionSpec.getParameter().unpack(BeamFnApi.RemoteGrpcPort.class)
- .getApiServiceDescriptor();
+ this.apiServiceDescriptor =
+ BeamFnApi.RemoteGrpcPort.parseFrom(functionSpec.getPayload()).getApiServiceDescriptor();
this.beamFnDataClientFactory = beamFnDataClientFactory;
this.processBundleInstructionIdSupplier = processBundleInstructionIdSupplier;
this.outputTarget = outputTarget;
http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BoundedSourceRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BoundedSourceRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BoundedSourceRunner.java
index 4702e05..5f6509f 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BoundedSourceRunner.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BoundedSourceRunner.java
@@ -22,7 +22,6 @@ import com.google.auto.service.AutoService;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Multimap;
-import com.google.protobuf.BytesValue;
import com.google.protobuf.InvalidProtocolBufferException;
import java.io.IOException;
import java.util.Collection;
@@ -122,17 +121,14 @@ public class BoundedSourceRunner<InputT extends BoundedSource<OutputT>, OutputT>
public void start() throws Exception {
try {
// The representation here is defined as the java serialized representation of the
- // bounded source object packed into a protobuf Any using a protobuf BytesValue wrapper.
- byte[] bytes = definition.getParameter().unpack(BytesValue.class).getValue().toByteArray();
+ // bounded source object in a ByteString wrapper.
+ byte[] bytes = definition.getPayload().toByteArray();
@SuppressWarnings("unchecked")
InputT boundedSource =
(InputT) SerializableUtils.deserializeFromByteArray(bytes, definition.toString());
runReadLoop(WindowedValue.valueInGlobalWindow(boundedSource));
} catch (InvalidProtocolBufferException e) {
- throw new IOException(
- String.format("Failed to decode %s, expected %s",
- definition.getParameter().getTypeUrl(), BytesValue.getDescriptor().getFullName()),
- e);
+ throw new IOException(String.format("Failed to decode %s", definition.getUrn()), e);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
index 97bd71c..86168f9 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
@@ -25,8 +25,6 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.Multimap;
import com.google.protobuf.ByteString;
-import com.google.protobuf.BytesValue;
-import com.google.protobuf.InvalidProtocolBufferException;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
@@ -109,13 +107,7 @@ public class FnApiDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Outp
outputMapBuilder.build();
// Get the DoFnInfo from the serialized blob.
- ByteString serializedFn;
- try {
- serializedFn = pTransform.getSpec().getParameter().unpack(BytesValue.class).getValue();
- } catch (InvalidProtocolBufferException e) {
- throw new IllegalArgumentException(
- String.format("Unable to unwrap DoFn %s", pTransform.getSpec()), e);
- }
+ ByteString serializedFn = pTransform.getSpec().getPayload();
@SuppressWarnings({"unchecked", "rawtypes"})
DoFnInfo<InputT, OutputT> doFnInfo = (DoFnInfo) SerializableUtils.deserializeFromByteArray(
serializedFn.toByteArray(), "DoFnInfo");
http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java
index d712f5f..92e6088 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java
@@ -37,7 +37,6 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Multimap;
import com.google.common.util.concurrent.Uninterruptibles;
-import com.google.protobuf.Any;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -80,7 +79,7 @@ public class BeamFnDataReadRunnerTest {
private static final BeamFnApi.RemoteGrpcPort PORT_SPEC = BeamFnApi.RemoteGrpcPort.newBuilder()
.setApiServiceDescriptor(BeamFnApi.ApiServiceDescriptor.getDefaultInstance()).build();
private static final RunnerApi.FunctionSpec FUNCTION_SPEC = RunnerApi.FunctionSpec.newBuilder()
- .setParameter(Any.pack(PORT_SPEC)).build();
+ .setPayload(PORT_SPEC.toByteString()).build();
private static final Coder<WindowedValue<String>> CODER =
WindowedValue.getFullCoder(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE);
private static final String CODER_SPEC_ID = "string-coder-id";
@@ -131,7 +130,7 @@ public class BeamFnDataReadRunnerTest {
RunnerApi.FunctionSpec functionSpec = RunnerApi.FunctionSpec.newBuilder()
.setUrn("urn:org.apache.beam:source:runner:0.1")
- .setParameter(Any.pack(PORT_SPEC))
+ .setPayload(PORT_SPEC.toByteString())
.build();
RunnerApi.PTransform pTransform = RunnerApi.PTransform.newBuilder()
http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java
index 0caf19e..ffa3a2d 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java
@@ -37,7 +37,6 @@ import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Multimap;
-import com.google.protobuf.Any;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -75,7 +74,7 @@ public class BeamFnDataWriteRunnerTest {
private static final BeamFnApi.RemoteGrpcPort PORT_SPEC = BeamFnApi.RemoteGrpcPort.newBuilder()
.setApiServiceDescriptor(BeamFnApi.ApiServiceDescriptor.getDefaultInstance()).build();
private static final RunnerApi.FunctionSpec FUNCTION_SPEC = RunnerApi.FunctionSpec.newBuilder()
- .setParameter(Any.pack(PORT_SPEC)).build();
+ .setPayload(PORT_SPEC.toByteString()).build();
private static final String CODER_ID = "string-coder-id";
private static final Coder<WindowedValue<String>> CODER =
WindowedValue.getFullCoder(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE);
@@ -117,7 +116,7 @@ public class BeamFnDataWriteRunnerTest {
RunnerApi.FunctionSpec functionSpec = RunnerApi.FunctionSpec.newBuilder()
.setUrn("urn:org.apache.beam:sink:runner:0.1")
- .setParameter(Any.pack(PORT_SPEC))
+ .setPayload(PORT_SPEC.toByteString())
.build();
RunnerApi.PTransform pTransform = RunnerApi.PTransform.newBuilder()
http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BoundedSourceRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BoundedSourceRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BoundedSourceRunnerTest.java
index 7aec161..b9f22e8 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BoundedSourceRunnerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BoundedSourceRunnerTest.java
@@ -31,9 +31,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Multimap;
-import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
-import com.google.protobuf.BytesValue;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@@ -107,8 +105,7 @@ public class BoundedSourceRunnerTest {
BoundedSourceRunner<BoundedSource<Long>, Long> runner = new BoundedSourceRunner<>(
PipelineOptionsFactory.create(),
- RunnerApi.FunctionSpec.newBuilder().setParameter(
- Any.pack(BytesValue.newBuilder().setValue(encodedSource).build())).build(),
+ RunnerApi.FunctionSpec.newBuilder().setPayload(encodedSource).build(),
consumers);
runner.start();
@@ -127,13 +124,12 @@ public class BoundedSourceRunnerTest {
List<ThrowingRunnable> startFunctions = new ArrayList<>();
List<ThrowingRunnable> finishFunctions = new ArrayList<>();
- RunnerApi.FunctionSpec functionSpec = RunnerApi.FunctionSpec.newBuilder()
- .setUrn("urn:org.apache.beam:source:java:0.1")
- .setParameter(Any.pack(BytesValue.newBuilder()
- .setValue(ByteString.copyFrom(
- SerializableUtils.serializeToByteArray(CountingSource.upTo(3))))
- .build()))
- .build();
+ RunnerApi.FunctionSpec functionSpec =
+ RunnerApi.FunctionSpec.newBuilder()
+ .setUrn("urn:org.apache.beam:source:java:0.1")
+ .setPayload(
+ ByteString.copyFrom(SerializableUtils.serializeToByteArray(CountingSource.upTo(3))))
+ .build();
RunnerApi.PTransform pTransform = RunnerApi.PTransform.newBuilder()
.setSpec(functionSpec)
http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
index e269bcc..efa8fcf 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
@@ -31,9 +31,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Multimap;
-import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
-import com.google.protobuf.BytesValue;
import java.util.ArrayList;
import java.util.List;
import java.util.ServiceLoader;
@@ -102,12 +100,11 @@ public class FnApiDoFnRunnerTest {
ImmutableMap.of(
Long.parseLong(mainOutputId), TestDoFn.mainOutput,
Long.parseLong(additionalOutputId), TestDoFn.additionalOutput));
- RunnerApi.FunctionSpec functionSpec = RunnerApi.FunctionSpec.newBuilder()
- .setUrn(ParDoTranslation.CUSTOM_JAVA_DO_FN_URN)
- .setParameter(Any.pack(BytesValue.newBuilder()
- .setValue(ByteString.copyFrom(SerializableUtils.serializeToByteArray(doFnInfo)))
- .build()))
- .build();
+ RunnerApi.FunctionSpec functionSpec =
+ RunnerApi.FunctionSpec.newBuilder()
+ .setUrn(ParDoTranslation.CUSTOM_JAVA_DO_FN_URN)
+ .setPayload(ByteString.copyFrom(SerializableUtils.serializeToByteArray(doFnInfo)))
+ .build();
RunnerApi.PTransform pTransform = RunnerApi.PTransform.newBuilder()
.setSpec(functionSpec)
.putInputs("inputA", "inputATarget")
http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/sdks/python/apache_beam/coders/coders.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py
index c56ef52..7ced5a9 100644
--- a/sdks/python/apache_beam/coders/coders.py
+++ b/sdks/python/apache_beam/coders/coders.py
@@ -206,22 +206,21 @@ class Coder(object):
"""For internal use only; no backwards-compatibility guarantees.
"""
# TODO(BEAM-115): Use specialized URNs and components.
+ serialized_coder = serialize_coder(self)
return beam_runner_api_pb2.Coder(
spec=beam_runner_api_pb2.SdkFunctionSpec(
spec=beam_runner_api_pb2.FunctionSpec(
urn=urns.PICKLED_CODER,
- parameter=proto_utils.pack_Any(
+ any_param=proto_utils.pack_Any(
google.protobuf.wrappers_pb2.BytesValue(
- value=serialize_coder(self))))))
+ value=serialized_coder)),
+ payload=serialized_coder)))
@staticmethod
def from_runner_api(proto, context):
"""For internal use only; no backwards-compatibility guarantees.
"""
- any_proto = proto.spec.spec.parameter
- bytes_proto = google.protobuf.wrappers_pb2.BytesValue()
- any_proto.Unpack(bytes_proto)
- return deserialize_coder(bytes_proto.value)
+ return deserialize_coder(proto.spec.spec.payload)
class StrUtf8Coder(Coder):
http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/sdks/python/apache_beam/runners/portability/fn_api_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
index 3222bcb..7c0c06f 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
@@ -218,16 +218,16 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
def side_inputs(self):
for transform in self.transforms:
if transform.spec.urn == urns.PARDO_TRANSFORM:
- payload = proto_utils.unpack_Any(
- transform.spec.parameter, beam_runner_api_pb2.ParDoPayload)
+ payload = proto_utils.parse_Bytes(
+ transform.spec.payload, beam_runner_api_pb2.ParDoPayload)
for side_input in payload.side_inputs:
yield transform.inputs[side_input]
def has_as_main_input(self, pcoll):
for transform in self.transforms:
if transform.spec.urn == urns.PARDO_TRANSFORM:
- payload = proto_utils.unpack_Any(
- transform.spec.parameter, beam_runner_api_pb2.ParDoPayload)
+ payload = proto_utils.parse_Bytes(
+ transform.spec.payload, beam_runner_api_pb2.ParDoPayload)
local_side_inputs = payload.side_inputs
else:
local_side_inputs = {}
@@ -257,9 +257,7 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
transform = stage.transforms[0]
if transform.spec.urn == urns.GROUP_BY_KEY_ONLY_TRANSFORM:
# This is used later to correlate the read and write.
- param = proto_utils.pack_Any(
- wrappers_pb2.BytesValue(
- value=str("group:%s" % stage.name)))
+ param = str("group:%s" % stage.name)
gbk_write = Stage(
transform.unique_name + '/Write',
[beam_runner_api_pb2.PTransform(
@@ -267,7 +265,9 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
inputs=transform.inputs,
spec=beam_runner_api_pb2.FunctionSpec(
urn=bundle_processor.DATA_OUTPUT_URN,
- parameter=param))],
+ any_param=proto_utils.pack_Any(
+ wrappers_pb2.BytesValue(value=param)),
+ payload=param))],
downstream_side_inputs=frozenset(),
must_follow=stage.must_follow)
yield gbk_write
@@ -279,7 +279,9 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
outputs=transform.outputs,
spec=beam_runner_api_pb2.FunctionSpec(
urn=bundle_processor.DATA_INPUT_URN,
- parameter=param))],
+ any_param=proto_utils.pack_Any(
+ wrappers_pb2.BytesValue(value=param)),
+ payload=param))],
downstream_side_inputs=frozenset(),
must_follow=union(frozenset([gbk_write]), stage.must_follow))
else:
@@ -299,9 +301,7 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
transform = stage.transforms[0]
if transform.spec.urn == urns.FLATTEN_TRANSFORM:
# This is used later to correlate the read and writes.
- param = proto_utils.pack_Any(
- wrappers_pb2.BytesValue(
- value=str("materialize:%s" % transform.unique_name)))
+ param = str("materialize:%s" % transform.unique_name)
output_pcoll_id, = transform.outputs.values()
output_coder_id = pcollections[output_pcoll_id].coder_id
flatten_writes = []
@@ -337,7 +337,10 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
inputs={local_in: transcoded_pcollection},
spec=beam_runner_api_pb2.FunctionSpec(
urn=bundle_processor.DATA_OUTPUT_URN,
- parameter=param))],
+ any_param=proto_utils.pack_Any(
+ wrappers_pb2.BytesValue(
+ value=param)),
+ payload=param))],
downstream_side_inputs=frozenset(),
must_follow=stage.must_follow)
flatten_writes.append(flatten_write)
@@ -350,7 +353,10 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
outputs=transform.outputs,
spec=beam_runner_api_pb2.FunctionSpec(
urn=bundle_processor.DATA_INPUT_URN,
- parameter=param))],
+ any_param=proto_utils.pack_Any(
+ wrappers_pb2.BytesValue(
+ value=param)),
+ payload=param))],
downstream_side_inputs=frozenset(),
must_follow=union(frozenset(flatten_writes), stage.must_follow))
@@ -439,9 +445,7 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
# Now try to fuse away all pcollections.
for pcoll, producer in producers_by_pcoll.items():
- pcoll_as_param = proto_utils.pack_Any(
- wrappers_pb2.BytesValue(
- value=str("materialize:%s" % pcoll)))
+ pcoll_as_param = str("materialize:%s" % pcoll)
write_pcoll = None
for consumer in consumers_by_pcoll[pcoll]:
producer = replacement(producer)
@@ -461,7 +465,10 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
inputs={'in': pcoll},
spec=beam_runner_api_pb2.FunctionSpec(
urn=bundle_processor.DATA_OUTPUT_URN,
- parameter=pcoll_as_param))])
+ any_param=proto_utils.pack_Any(
+ wrappers_pb2.BytesValue(
+ value=pcoll_as_param)),
+ payload=pcoll_as_param))])
fuse(producer, write_pcoll)
if consumer.has_as_main_input(pcoll):
read_pcoll = Stage(
@@ -471,7 +478,10 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
outputs={'out': pcoll},
spec=beam_runner_api_pb2.FunctionSpec(
urn=bundle_processor.DATA_INPUT_URN,
- parameter=pcoll_as_param))],
+ any_param=proto_utils.pack_Any(
+ wrappers_pb2.BytesValue(
+ value=pcoll_as_param)),
+ payload=pcoll_as_param))],
must_follow={write_pcoll})
fuse(read_pcoll, consumer)
@@ -567,8 +577,7 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
data_side_input = {}
data_output = {}
for transform in stage.transforms:
- pcoll_id = proto_utils.unpack_Any(
- transform.spec.parameter, wrappers_pb2.BytesValue).value
+ pcoll_id = transform.spec.payload
if transform.spec.urn in (bundle_processor.DATA_INPUT_URN,
bundle_processor.DATA_OUTPUT_URN):
if transform.spec.urn == bundle_processor.DATA_INPUT_URN:
@@ -580,9 +589,11 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
else:
raise NotImplementedError
if data_operation_spec:
- transform.spec.parameter.CopyFrom(data_operation_spec)
+ transform.spec.payload = data_operation_spec
+ transform.spec.any_param.CopyFrom(data_operation_spec)
else:
- transform.spec.parameter.Clear()
+ transform.spec.payload = ""
+ transform.spec.any_param.Clear()
return data_input, data_side_input, data_output
logging.info('Running %s', stage.name)
@@ -728,7 +739,9 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
runner_sinks[(transform_id, target_name)] = operation
transform_spec = beam_runner_api_pb2.FunctionSpec(
urn=bundle_processor.DATA_OUTPUT_URN,
- parameter=proto_utils.pack_Any(data_operation_spec))
+ any_param=proto_utils.pack_Any(data_operation_spec),
+ payload=data_operation_spec.SerializeToString() \
+ if data_operation_spec is not None else None)
elif isinstance(operation, operation_specs.WorkerRead):
# A Read from an in-memory source is done over the data plane.
@@ -742,19 +755,23 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
operation.source.source.default_output_coder())
transform_spec = beam_runner_api_pb2.FunctionSpec(
urn=bundle_processor.DATA_INPUT_URN,
- parameter=proto_utils.pack_Any(data_operation_spec))
+ any_param=proto_utils.pack_Any(data_operation_spec),
+ payload=data_operation_spec.SerializeToString() \
+ if data_operation_spec is not None else None)
else:
# Otherwise serialize the source and execute it there.
# TODO: Use SDFs with an initial impulse.
# The Dataflow runner harness strips the base64 encoding. do the same
# here until we get the same thing back that we sent in.
+ source_bytes = base64.b64decode(
+ pickler.dumps(operation.source.source))
transform_spec = beam_runner_api_pb2.FunctionSpec(
urn=bundle_processor.PYTHON_SOURCE_URN,
- parameter=proto_utils.pack_Any(
+ any_param=proto_utils.pack_Any(
wrappers_pb2.BytesValue(
- value=base64.b64decode(
- pickler.dumps(operation.source.source)))))
+ value=source_bytes)),
+ payload=source_bytes)
elif isinstance(operation, operation_specs.WorkerDoFn):
# Record the contents of each side input for access via the state api.
@@ -773,8 +790,9 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
(operation.serialized_fn, side_input_extras))
transform_spec = beam_runner_api_pb2.FunctionSpec(
urn=bundle_processor.PYTHON_DOFN_URN,
- parameter=proto_utils.pack_Any(
- wrappers_pb2.BytesValue(value=augmented_serialized_fn)))
+ any_param=proto_utils.pack_Any(
+ wrappers_pb2.BytesValue(value=augmented_serialized_fn)),
+ payload=augmented_serialized_fn)
elif isinstance(operation, operation_specs.WorkerFlatten):
# Flatten is nice and simple.
http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/sdks/python/apache_beam/runners/worker/bundle_processor.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py b/sdks/python/apache_beam/runners/worker/bundle_processor.py
index 9474eda..16c888c 100644
--- a/sdks/python/apache_beam/runners/worker/bundle_processor.py
+++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py
@@ -282,9 +282,9 @@ class BeamTransformFactory(object):
def create_operation(self, transform_id, consumers):
transform_proto = self.descriptor.transforms[transform_id]
creator, parameter_type = self._known_urns[transform_proto.spec.urn]
- parameter = proto_utils.unpack_Any(
- transform_proto.spec.parameter, parameter_type)
- return creator(self, transform_id, transform_proto, parameter, consumers)
+ payload = proto_utils.parse_Bytes(
+ transform_proto.spec.payload, parameter_type)
+ return creator(self, transform_id, transform_proto, payload, consumers)
def get_coder(self, coder_id):
coder_proto = self.descriptor.coders[coder_id]
@@ -293,9 +293,7 @@ class BeamTransformFactory(object):
else:
# No URN, assume cloud object encoding json bytes.
return operation_specs.get_coder_from_spec(
- json.loads(
- proto_utils.unpack_Any(coder_proto.spec.spec.parameter,
- wrappers_pb2.BytesValue).value))
+ json.loads(coder_proto.spec.spec.payload))
def get_output_coders(self, transform_proto):
return {
@@ -360,10 +358,10 @@ def create(factory, transform_id, transform_proto, grpc_port, consumers):
data_channel=factory.data_channel_factory.create_data_channel(grpc_port))
-@BeamTransformFactory.register_urn(PYTHON_SOURCE_URN, wrappers_pb2.BytesValue)
+@BeamTransformFactory.register_urn(PYTHON_SOURCE_URN, None)
def create(factory, transform_id, transform_proto, parameter, consumers):
# The Dataflow runner harness strips the base64 encoding.
- source = pickler.loads(base64.b64encode(parameter.value))
+ source = pickler.loads(base64.b64encode(parameter))
spec = operation_specs.WorkerRead(
iobase.SourceBundle(1.0, source, None, None),
[WindowedValueCoder(source.default_output_coder())])
@@ -395,9 +393,9 @@ def create(factory, transform_id, transform_proto, parameter, consumers):
consumers)
-@BeamTransformFactory.register_urn(PYTHON_DOFN_URN, wrappers_pb2.BytesValue)
+@BeamTransformFactory.register_urn(PYTHON_DOFN_URN, None)
def create(factory, transform_id, transform_proto, parameter, consumers):
- dofn_data = pickler.loads(parameter.value)
+ dofn_data = pickler.loads(parameter)
if len(dofn_data) == 2:
# Has side input data.
serialized_fn, side_input_data = dofn_data
@@ -413,8 +411,7 @@ def create(factory, transform_id, transform_proto, parameter, consumers):
urns.PARDO_TRANSFORM, beam_runner_api_pb2.ParDoPayload)
def create(factory, transform_id, transform_proto, parameter, consumers):
assert parameter.do_fn.spec.urn == urns.PICKLED_DO_FN_INFO
- serialized_fn = proto_utils.unpack_Any(
- parameter.do_fn.spec.parameter, wrappers_pb2.BytesValue).value
+ serialized_fn = parameter.do_fn.spec.payload
dofn_data = pickler.loads(serialized_fn)
if len(dofn_data) == 2:
# Has side input data.
http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index 3f92ce9..9018a49 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -699,24 +699,24 @@ class ParDo(PTransformWithSideInputs):
def to_runner_api_parameter(self, context):
assert self.__class__ is ParDo
+ picked_pardo_fn_data = pickler.dumps(self._pardo_fn_data())
return (
urns.PARDO_TRANSFORM,
beam_runner_api_pb2.ParDoPayload(
do_fn=beam_runner_api_pb2.SdkFunctionSpec(
spec=beam_runner_api_pb2.FunctionSpec(
urn=urns.PICKLED_DO_FN_INFO,
- parameter=proto_utils.pack_Any(
+ any_param=proto_utils.pack_Any(
wrappers_pb2.BytesValue(
- value=pickler.dumps(
- self._pardo_fn_data())))))))
+ value=picked_pardo_fn_data)),
+ payload=picked_pardo_fn_data))))
@PTransform.register_urn(
urns.PARDO_TRANSFORM, beam_runner_api_pb2.ParDoPayload)
def from_runner_api_parameter(pardo_payload, context):
assert pardo_payload.do_fn.spec.urn == urns.PICKLED_DO_FN_INFO
fn, args, kwargs, si_tags_and_types, windowing = pickler.loads(
- proto_utils.unpack_Any(
- pardo_payload.do_fn.spec.parameter, wrappers_pb2.BytesValue).value)
+ pardo_payload.do_fn.spec.payload)
if si_tags_and_types:
raise NotImplementedError('deferred side inputs')
elif windowing:
http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/sdks/python/apache_beam/transforms/ptransform.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py
index da113e0..a798fa1 100644
--- a/sdks/python/apache_beam/transforms/ptransform.py
+++ b/sdks/python/apache_beam/transforms/ptransform.py
@@ -442,7 +442,9 @@ class PTransform(WithTypeHints, HasDisplayData):
urn, typed_param = self.to_runner_api_parameter(context)
return beam_runner_api_pb2.FunctionSpec(
urn=urn,
- parameter=proto_utils.pack_Any(typed_param))
+ any_param=proto_utils.pack_Any(typed_param),
+ payload=typed_param.SerializeToString()
+ if typed_param is not None else None)
@classmethod
def from_runner_api(cls, proto, context):
@@ -450,7 +452,7 @@ class PTransform(WithTypeHints, HasDisplayData):
return None
parameter_type, constructor = cls._known_urns[proto.urn]
return constructor(
- proto_utils.unpack_Any(proto.parameter, parameter_type),
+ proto_utils.parse_Bytes(proto.payload, parameter_type),
context)
def to_runner_api_parameter(self, context):
http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/sdks/python/apache_beam/utils/proto_utils.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/proto_utils.py b/sdks/python/apache_beam/utils/proto_utils.py
index af8f218..d7693f3 100644
--- a/sdks/python/apache_beam/utils/proto_utils.py
+++ b/sdks/python/apache_beam/utils/proto_utils.py
@@ -46,6 +46,17 @@ def unpack_Any(any_msg, msg_class):
return msg
+def parse_Bytes(bytes, msg_class):
+ """Parses the String of bytes into msg_class.
+
+ Returns the input bytes if msg_class is None."""
+ if msg_class is None:
+ return bytes
+ msg = msg_class()
+ msg.ParseFromString(bytes)
+ return msg
+
+
def pack_Struct(**kwargs):
"""Returns a struct containing the values indicated by kwargs.
"""
http://git-wip-us.apache.org/repos/asf/beam/blob/2b9b0504/sdks/python/apache_beam/utils/urns.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/urns.py b/sdks/python/apache_beam/utils/urns.py
index 0013cb3..acf729f 100644
--- a/sdks/python/apache_beam/utils/urns.py
+++ b/sdks/python/apache_beam/utils/urns.py
@@ -120,7 +120,9 @@ class RunnerApiFn(object):
return beam_runner_api_pb2.SdkFunctionSpec(
spec=beam_runner_api_pb2.FunctionSpec(
urn=urn,
- parameter=proto_utils.pack_Any(typed_param)))
+ any_param=proto_utils.pack_Any(typed_param),
+ payload=typed_param.SerializeToString()
+ if typed_param is not None else None))
@classmethod
def from_runner_api(cls, fn_proto, context):
@@ -130,5 +132,5 @@ class RunnerApiFn(object):
"""
parameter_type, constructor = cls._known_urns[fn_proto.spec.urn]
return constructor(
- proto_utils.unpack_Any(fn_proto.spec.parameter, parameter_type),
+ proto_utils.parse_Bytes(fn_proto.spec.payload, parameter_type),
context)
[2/2] beam git commit: This closes #3193
Posted by tg...@apache.org.
This closes #3193
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9ed2cf41
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9ed2cf41
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9ed2cf41
Branch: refs/heads/master
Commit: 9ed2cf41f1f068b3b27e9a814d6f7ceed3406dbf
Parents: f5714f2 2b9b050
Author: Thomas Groh <tg...@google.com>
Authored: Tue Aug 8 18:19:00 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Aug 8 18:19:00 2017 -0700
----------------------------------------------------------------------
.../core/construction/CoderTranslation.java | 16 +---
.../core/construction/CombineTranslation.java | 28 ++----
.../CreatePCollectionViewTranslation.java | 17 +---
.../construction/PTransformTranslation.java | 12 +--
.../core/construction/ParDoTranslation.java | 58 +++++-------
.../core/construction/PipelineTranslation.java | 12 +--
.../core/construction/ReadTranslation.java | 56 +++++------
.../construction/TestStreamTranslation.java | 5 +-
.../construction/WindowIntoTranslation.java | 9 +-
.../WindowingStrategyTranslation.java | 97 ++++++++------------
.../construction/WriteFilesTranslation.java | 29 +++---
.../CreatePCollectionViewTranslationTest.java | 6 +-
.../core/construction/ParDoTranslationTest.java | 3 +-
.../construction/TestStreamTranslationTest.java | 4 +-
.../src/main/proto/beam_runner_api.proto | 5 +-
.../beam/fn/harness/BeamFnDataReadRunner.java | 4 +-
.../beam/fn/harness/BeamFnDataWriteRunner.java | 4 +-
.../beam/fn/harness/BoundedSourceRunner.java | 10 +-
.../apache/beam/fn/harness/FnApiDoFnRunner.java | 10 +-
.../fn/harness/BeamFnDataReadRunnerTest.java | 5 +-
.../fn/harness/BeamFnDataWriteRunnerTest.java | 5 +-
.../fn/harness/BoundedSourceRunnerTest.java | 18 ++--
.../beam/fn/harness/FnApiDoFnRunnerTest.java | 13 +--
sdks/python/apache_beam/coders/coders.py | 11 +--
.../runners/portability/fn_api_runner.py | 78 ++++++++++------
.../runners/worker/bundle_processor.py | 21 ++---
sdks/python/apache_beam/transforms/core.py | 10 +-
.../python/apache_beam/transforms/ptransform.py | 6 +-
sdks/python/apache_beam/utils/proto_utils.py | 11 +++
sdks/python/apache_beam/utils/urns.py | 6 +-
30 files changed, 250 insertions(+), 319 deletions(-)
----------------------------------------------------------------------