You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/10/05 16:46:07 UTC
[2/2] beam git commit: Remove any_param field from FunctionSpec
Remove any_param field from FunctionSpec
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/060bda23
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/060bda23
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/060bda23
Branch: refs/heads/master
Commit: 060bda23d1e5cd5146190aa34f2e212404cb6667
Parents: 294518e
Author: Thomas Groh <tg...@google.com>
Authored: Tue Sep 19 16:39:44 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Thu Oct 5 09:45:44 2017 -0700
----------------------------------------------------------------------
.../WindowingStrategyTranslation.java | 7 ------
.../src/main/proto/beam_runner_api.proto | 3 ---
sdks/python/apache_beam/coders/coders.py | 1 -
.../runners/portability/fn_api_runner.py | 26 --------------------
sdks/python/apache_beam/transforms/core.py | 4 ---
.../python/apache_beam/transforms/ptransform.py | 1 -
sdks/python/apache_beam/utils/urns.py | 1 -
7 files changed, 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/060bda23/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
index 1b4786c..be8601c 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
@@ -17,9 +17,7 @@
*/
package org.apache.beam.runners.core.construction;
-import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
-import com.google.protobuf.BytesValue;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.Durations;
import com.google.protobuf.util.Timestamps;
@@ -223,7 +221,6 @@ public class WindowingStrategyTranslation implements Serializable {
.setSpec(
FunctionSpec.newBuilder()
.setUrn(OLD_SERIALIZED_JAVA_WINDOWFN_URN)
- .setAnyParam(Any.pack(BytesValue.newBuilder().setValue(serializedFn).build()))
.setPayload(serializedFn)
.build())
.build();
@@ -241,7 +238,6 @@ public class WindowingStrategyTranslation implements Serializable {
.setSpec(
FunctionSpec.newBuilder()
.setUrn(FIXED_WINDOWS_FN)
- .setAnyParam(Any.pack(fixedWindowsPayload))
.setPayload(fixedWindowsPayload.toByteString()))
.build();
} else if (windowFn instanceof SlidingWindows) {
@@ -254,7 +250,6 @@ public class WindowingStrategyTranslation implements Serializable {
.setSpec(
FunctionSpec.newBuilder()
.setUrn(SLIDING_WINDOWS_FN)
- .setAnyParam(Any.pack(slidingWindowsPayload))
.setPayload(slidingWindowsPayload.toByteString()))
.build();
} else if (windowFn instanceof Sessions) {
@@ -266,7 +261,6 @@ public class WindowingStrategyTranslation implements Serializable {
.setSpec(
FunctionSpec.newBuilder()
.setUrn(SESSION_WINDOWS_FN)
- .setAnyParam(Any.pack(sessionsPayload))
.setPayload(sessionsPayload.toByteString()))
.build();
} else {
@@ -274,7 +268,6 @@ public class WindowingStrategyTranslation implements Serializable {
.setSpec(
FunctionSpec.newBuilder()
.setUrn(SERIALIZED_JAVA_WINDOWFN_URN)
- .setAnyParam(Any.pack(BytesValue.newBuilder().setValue(serializedFn).build()))
.setPayload(serializedFn))
.build();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/060bda23/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
----------------------------------------------------------------------
diff --git a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
index 9ba5577..74f3897 100644
--- a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
+++ b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
@@ -782,9 +782,6 @@ message FunctionSpec {
// passed as-is.
string urn = 1;
- // (Deprecated)
- google.protobuf.Any any_param = 2;
-
// (Optional) The data specifying any parameters to the URN. If
// the URN does not require any arguments, this may be omitted.
bytes payload = 3;
http://git-wip-us.apache.org/repos/asf/beam/blob/060bda23/sdks/python/apache_beam/coders/coders.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py
index 3021da5..cbea98f 100644
--- a/sdks/python/apache_beam/coders/coders.py
+++ b/sdks/python/apache_beam/coders/coders.py
@@ -234,7 +234,6 @@ class Coder(object):
spec=beam_runner_api_pb2.SdkFunctionSpec(
spec=beam_runner_api_pb2.FunctionSpec(
urn=urn,
- any_param=proto_utils.pack_Any(typed_param),
payload=typed_param.SerializeToString()
if typed_param is not None else None)),
component_coder_ids=[context.coders.get_id(c) for c in components])
http://git-wip-us.apache.org/repos/asf/beam/blob/060bda23/sdks/python/apache_beam/runners/portability/fn_api_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
index 21bf61a..20a4a61 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
@@ -27,7 +27,6 @@ import time
from concurrent import futures
import grpc
-from google.protobuf import wrappers_pb2
import apache_beam as beam # pylint: disable=ungrouped-imports
from apache_beam.coders import WindowedValueCoder
@@ -349,8 +348,6 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
inputs=transform.inputs,
spec=beam_runner_api_pb2.FunctionSpec(
urn=bundle_processor.DATA_OUTPUT_URN,
- any_param=proto_utils.pack_Any(
- wrappers_pb2.BytesValue(value=param)),
payload=param))],
downstream_side_inputs=frozenset(),
must_follow=stage.must_follow)
@@ -363,8 +360,6 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
outputs=transform.outputs,
spec=beam_runner_api_pb2.FunctionSpec(
urn=bundle_processor.DATA_INPUT_URN,
- any_param=proto_utils.pack_Any(
- wrappers_pb2.BytesValue(value=param)),
payload=param))],
downstream_side_inputs=frozenset(),
must_follow=union(frozenset([gbk_write]), stage.must_follow))
@@ -421,9 +416,6 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
inputs={local_in: transcoded_pcollection},
spec=beam_runner_api_pb2.FunctionSpec(
urn=bundle_processor.DATA_OUTPUT_URN,
- any_param=proto_utils.pack_Any(
- wrappers_pb2.BytesValue(
- value=param)),
payload=param))],
downstream_side_inputs=frozenset(),
must_follow=stage.must_follow)
@@ -437,9 +429,6 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
outputs=transform.outputs,
spec=beam_runner_api_pb2.FunctionSpec(
urn=bundle_processor.DATA_INPUT_URN,
- any_param=proto_utils.pack_Any(
- wrappers_pb2.BytesValue(
- value=param)),
payload=param))],
downstream_side_inputs=frozenset(),
must_follow=union(frozenset(flatten_writes), stage.must_follow))
@@ -549,9 +538,6 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
inputs={'in': pcoll},
spec=beam_runner_api_pb2.FunctionSpec(
urn=bundle_processor.DATA_OUTPUT_URN,
- any_param=proto_utils.pack_Any(
- wrappers_pb2.BytesValue(
- value=pcoll_as_param)),
payload=pcoll_as_param))])
fuse(producer, write_pcoll)
if consumer.has_as_main_input(pcoll):
@@ -562,9 +548,6 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
outputs={'out': pcoll},
spec=beam_runner_api_pb2.FunctionSpec(
urn=bundle_processor.DATA_INPUT_URN,
- any_param=proto_utils.pack_Any(
- wrappers_pb2.BytesValue(
- value=pcoll_as_param)),
payload=pcoll_as_param))],
must_follow={write_pcoll})
fuse(read_pcoll, consumer)
@@ -686,10 +669,8 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
raise NotImplementedError
if data_operation_spec:
transform.spec.payload = data_operation_spec.SerializeToString()
- transform.spec.any_param.Pack(data_operation_spec)
else:
transform.spec.payload = ""
- transform.spec.any_param.Clear()
return data_input, data_side_input, data_output
logging.info('Running %s', stage.name)
@@ -838,7 +819,6 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
runner_sinks[(transform_id, target_name)] = operation
transform_spec = beam_runner_api_pb2.FunctionSpec(
urn=bundle_processor.DATA_OUTPUT_URN,
- any_param=proto_utils.pack_Any(data_operation_spec),
payload=data_operation_spec.SerializeToString() \
if data_operation_spec is not None else None)
@@ -854,7 +834,6 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
operation.source.source.default_output_coder())
transform_spec = beam_runner_api_pb2.FunctionSpec(
urn=bundle_processor.DATA_INPUT_URN,
- any_param=proto_utils.pack_Any(data_operation_spec),
payload=data_operation_spec.SerializeToString() \
if data_operation_spec is not None else None)
@@ -867,9 +846,6 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
pickler.dumps(operation.source.source))
transform_spec = beam_runner_api_pb2.FunctionSpec(
urn=bundle_processor.PYTHON_SOURCE_URN,
- any_param=proto_utils.pack_Any(
- wrappers_pb2.BytesValue(
- value=source_bytes)),
payload=source_bytes)
elif isinstance(operation, operation_specs.WorkerDoFn):
@@ -889,8 +865,6 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
(operation.serialized_fn, side_input_extras))
transform_spec = beam_runner_api_pb2.FunctionSpec(
urn=bundle_processor.PYTHON_DOFN_URN,
- any_param=proto_utils.pack_Any(
- wrappers_pb2.BytesValue(value=augmented_serialized_fn)),
payload=augmented_serialized_fn)
elif isinstance(operation, operation_specs.WorkerFlatten):
http://git-wip-us.apache.org/repos/asf/beam/blob/060bda23/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index 5d92fe9..153dc32 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -53,7 +53,6 @@ from apache_beam.typehints.decorators import WithTypeHints
from apache_beam.typehints.decorators import get_type_hints
from apache_beam.typehints.trivial_inference import element_type
from apache_beam.typehints.typehints import is_consistent_with
-from apache_beam.utils import proto_utils
from apache_beam.utils import urns
__all__ = [
@@ -715,9 +714,6 @@ class ParDo(PTransformWithSideInputs):
do_fn=beam_runner_api_pb2.SdkFunctionSpec(
spec=beam_runner_api_pb2.FunctionSpec(
urn=urns.PICKLED_DO_FN_INFO,
- any_param=proto_utils.pack_Any(
- wrappers_pb2.BytesValue(
- value=picked_pardo_fn_data)),
payload=picked_pardo_fn_data))))
@PTransform.register_urn(
http://git-wip-us.apache.org/repos/asf/beam/blob/060bda23/sdks/python/apache_beam/transforms/ptransform.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py
index 7cf1441..2e6255a 100644
--- a/sdks/python/apache_beam/transforms/ptransform.py
+++ b/sdks/python/apache_beam/transforms/ptransform.py
@@ -452,7 +452,6 @@ class PTransform(WithTypeHints, HasDisplayData):
urn, typed_param = self.to_runner_api_parameter(context)
return beam_runner_api_pb2.FunctionSpec(
urn=urn,
- any_param=proto_utils.pack_Any(typed_param),
payload=typed_param.SerializeToString()
if typed_param is not None else None)
http://git-wip-us.apache.org/repos/asf/beam/blob/060bda23/sdks/python/apache_beam/utils/urns.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/urns.py b/sdks/python/apache_beam/utils/urns.py
index 7675d05..2aeaa53 100644
--- a/sdks/python/apache_beam/utils/urns.py
+++ b/sdks/python/apache_beam/utils/urns.py
@@ -128,7 +128,6 @@ class RunnerApiFn(object):
return beam_runner_api_pb2.SdkFunctionSpec(
spec=beam_runner_api_pb2.FunctionSpec(
urn=urn,
- any_param=proto_utils.pack_Any(typed_param),
payload=typed_param.SerializeToString()
if typed_param is not None else None))