You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/10/05 16:46:07 UTC

[2/2] beam git commit: Remove any_param field from FunctionSpec

Remove any_param field from FunctionSpec


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/060bda23
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/060bda23
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/060bda23

Branch: refs/heads/master
Commit: 060bda23d1e5cd5146190aa34f2e212404cb6667
Parents: 294518e
Author: Thomas Groh <tg...@google.com>
Authored: Tue Sep 19 16:39:44 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Thu Oct 5 09:45:44 2017 -0700

----------------------------------------------------------------------
 .../WindowingStrategyTranslation.java           |  7 ------
 .../src/main/proto/beam_runner_api.proto        |  3 ---
 sdks/python/apache_beam/coders/coders.py        |  1 -
 .../runners/portability/fn_api_runner.py        | 26 --------------------
 sdks/python/apache_beam/transforms/core.py      |  4 ---
 .../python/apache_beam/transforms/ptransform.py |  1 -
 sdks/python/apache_beam/utils/urns.py           |  1 -
 7 files changed, 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/060bda23/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
index 1b4786c..be8601c 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
@@ -17,9 +17,7 @@
  */
 package org.apache.beam.runners.core.construction;
 
-import com.google.protobuf.Any;
 import com.google.protobuf.ByteString;
-import com.google.protobuf.BytesValue;
 import com.google.protobuf.InvalidProtocolBufferException;
 import com.google.protobuf.util.Durations;
 import com.google.protobuf.util.Timestamps;
@@ -223,7 +221,6 @@ public class WindowingStrategyTranslation implements Serializable {
           .setSpec(
               FunctionSpec.newBuilder()
                   .setUrn(OLD_SERIALIZED_JAVA_WINDOWFN_URN)
-                  .setAnyParam(Any.pack(BytesValue.newBuilder().setValue(serializedFn).build()))
                   .setPayload(serializedFn)
                   .build())
           .build();
@@ -241,7 +238,6 @@ public class WindowingStrategyTranslation implements Serializable {
           .setSpec(
               FunctionSpec.newBuilder()
                   .setUrn(FIXED_WINDOWS_FN)
-                  .setAnyParam(Any.pack(fixedWindowsPayload))
                   .setPayload(fixedWindowsPayload.toByteString()))
           .build();
     } else if (windowFn instanceof SlidingWindows) {
@@ -254,7 +250,6 @@ public class WindowingStrategyTranslation implements Serializable {
           .setSpec(
               FunctionSpec.newBuilder()
                   .setUrn(SLIDING_WINDOWS_FN)
-                  .setAnyParam(Any.pack(slidingWindowsPayload))
                   .setPayload(slidingWindowsPayload.toByteString()))
           .build();
     } else if (windowFn instanceof Sessions) {
@@ -266,7 +261,6 @@ public class WindowingStrategyTranslation implements Serializable {
           .setSpec(
               FunctionSpec.newBuilder()
                   .setUrn(SESSION_WINDOWS_FN)
-                  .setAnyParam(Any.pack(sessionsPayload))
                   .setPayload(sessionsPayload.toByteString()))
           .build();
     } else {
@@ -274,7 +268,6 @@ public class WindowingStrategyTranslation implements Serializable {
           .setSpec(
               FunctionSpec.newBuilder()
                   .setUrn(SERIALIZED_JAVA_WINDOWFN_URN)
-                  .setAnyParam(Any.pack(BytesValue.newBuilder().setValue(serializedFn).build()))
                   .setPayload(serializedFn))
           .build();
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/060bda23/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
----------------------------------------------------------------------
diff --git a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
index 9ba5577..74f3897 100644
--- a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
+++ b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
@@ -782,9 +782,6 @@ message FunctionSpec {
   // passed as-is.
   string urn = 1;
 
-  // (Deprecated)
-  google.protobuf.Any any_param = 2;
-
   // (Optional) The data specifying any parameters to the URN. If
   // the URN does not require any arguments, this may be omitted.
   bytes payload = 3;

http://git-wip-us.apache.org/repos/asf/beam/blob/060bda23/sdks/python/apache_beam/coders/coders.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py
index 3021da5..cbea98f 100644
--- a/sdks/python/apache_beam/coders/coders.py
+++ b/sdks/python/apache_beam/coders/coders.py
@@ -234,7 +234,6 @@ class Coder(object):
         spec=beam_runner_api_pb2.SdkFunctionSpec(
             spec=beam_runner_api_pb2.FunctionSpec(
                 urn=urn,
-                any_param=proto_utils.pack_Any(typed_param),
                 payload=typed_param.SerializeToString()
                 if typed_param is not None else None)),
         component_coder_ids=[context.coders.get_id(c) for c in components])

http://git-wip-us.apache.org/repos/asf/beam/blob/060bda23/sdks/python/apache_beam/runners/portability/fn_api_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
index 21bf61a..20a4a61 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
@@ -27,7 +27,6 @@ import time
 from concurrent import futures
 
 import grpc
-from google.protobuf import wrappers_pb2
 
 import apache_beam as beam  # pylint: disable=ungrouped-imports
 from apache_beam.coders import WindowedValueCoder
@@ -349,8 +348,6 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
                   inputs=transform.inputs,
                   spec=beam_runner_api_pb2.FunctionSpec(
                       urn=bundle_processor.DATA_OUTPUT_URN,
-                      any_param=proto_utils.pack_Any(
-                          wrappers_pb2.BytesValue(value=param)),
                       payload=param))],
               downstream_side_inputs=frozenset(),
               must_follow=stage.must_follow)
@@ -363,8 +360,6 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
                   outputs=transform.outputs,
                   spec=beam_runner_api_pb2.FunctionSpec(
                       urn=bundle_processor.DATA_INPUT_URN,
-                      any_param=proto_utils.pack_Any(
-                          wrappers_pb2.BytesValue(value=param)),
                       payload=param))],
               downstream_side_inputs=frozenset(),
               must_follow=union(frozenset([gbk_write]), stage.must_follow))
@@ -421,9 +416,6 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
                     inputs={local_in: transcoded_pcollection},
                     spec=beam_runner_api_pb2.FunctionSpec(
                         urn=bundle_processor.DATA_OUTPUT_URN,
-                        any_param=proto_utils.pack_Any(
-                            wrappers_pb2.BytesValue(
-                                value=param)),
                         payload=param))],
                 downstream_side_inputs=frozenset(),
                 must_follow=stage.must_follow)
@@ -437,9 +429,6 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
                   outputs=transform.outputs,
                   spec=beam_runner_api_pb2.FunctionSpec(
                       urn=bundle_processor.DATA_INPUT_URN,
-                      any_param=proto_utils.pack_Any(
-                          wrappers_pb2.BytesValue(
-                              value=param)),
                       payload=param))],
               downstream_side_inputs=frozenset(),
               must_follow=union(frozenset(flatten_writes), stage.must_follow))
@@ -549,9 +538,6 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
                       inputs={'in': pcoll},
                       spec=beam_runner_api_pb2.FunctionSpec(
                           urn=bundle_processor.DATA_OUTPUT_URN,
-                          any_param=proto_utils.pack_Any(
-                              wrappers_pb2.BytesValue(
-                                  value=pcoll_as_param)),
                           payload=pcoll_as_param))])
               fuse(producer, write_pcoll)
             if consumer.has_as_main_input(pcoll):
@@ -562,9 +548,6 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
                       outputs={'out': pcoll},
                       spec=beam_runner_api_pb2.FunctionSpec(
                           urn=bundle_processor.DATA_INPUT_URN,
-                          any_param=proto_utils.pack_Any(
-                              wrappers_pb2.BytesValue(
-                                  value=pcoll_as_param)),
                           payload=pcoll_as_param))],
                   must_follow={write_pcoll})
               fuse(read_pcoll, consumer)
@@ -686,10 +669,8 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
             raise NotImplementedError
           if data_operation_spec:
             transform.spec.payload = data_operation_spec.SerializeToString()
-            transform.spec.any_param.Pack(data_operation_spec)
           else:
             transform.spec.payload = ""
-            transform.spec.any_param.Clear()
       return data_input, data_side_input, data_output
 
     logging.info('Running %s', stage.name)
@@ -838,7 +819,6 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
         runner_sinks[(transform_id, target_name)] = operation
         transform_spec = beam_runner_api_pb2.FunctionSpec(
             urn=bundle_processor.DATA_OUTPUT_URN,
-            any_param=proto_utils.pack_Any(data_operation_spec),
             payload=data_operation_spec.SerializeToString() \
                 if data_operation_spec is not None else None)
 
@@ -854,7 +834,6 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
               operation.source.source.default_output_coder())
           transform_spec = beam_runner_api_pb2.FunctionSpec(
               urn=bundle_processor.DATA_INPUT_URN,
-              any_param=proto_utils.pack_Any(data_operation_spec),
               payload=data_operation_spec.SerializeToString() \
                   if data_operation_spec is not None else None)
 
@@ -867,9 +846,6 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
               pickler.dumps(operation.source.source))
           transform_spec = beam_runner_api_pb2.FunctionSpec(
               urn=bundle_processor.PYTHON_SOURCE_URN,
-              any_param=proto_utils.pack_Any(
-                  wrappers_pb2.BytesValue(
-                      value=source_bytes)),
               payload=source_bytes)
 
       elif isinstance(operation, operation_specs.WorkerDoFn):
@@ -889,8 +865,6 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
             (operation.serialized_fn, side_input_extras))
         transform_spec = beam_runner_api_pb2.FunctionSpec(
             urn=bundle_processor.PYTHON_DOFN_URN,
-            any_param=proto_utils.pack_Any(
-                wrappers_pb2.BytesValue(value=augmented_serialized_fn)),
             payload=augmented_serialized_fn)
 
       elif isinstance(operation, operation_specs.WorkerFlatten):

http://git-wip-us.apache.org/repos/asf/beam/blob/060bda23/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index 5d92fe9..153dc32 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -53,7 +53,6 @@ from apache_beam.typehints.decorators import WithTypeHints
 from apache_beam.typehints.decorators import get_type_hints
 from apache_beam.typehints.trivial_inference import element_type
 from apache_beam.typehints.typehints import is_consistent_with
-from apache_beam.utils import proto_utils
 from apache_beam.utils import urns
 
 __all__ = [
@@ -715,9 +714,6 @@ class ParDo(PTransformWithSideInputs):
             do_fn=beam_runner_api_pb2.SdkFunctionSpec(
                 spec=beam_runner_api_pb2.FunctionSpec(
                     urn=urns.PICKLED_DO_FN_INFO,
-                    any_param=proto_utils.pack_Any(
-                        wrappers_pb2.BytesValue(
-                            value=picked_pardo_fn_data)),
                     payload=picked_pardo_fn_data))))
 
   @PTransform.register_urn(

http://git-wip-us.apache.org/repos/asf/beam/blob/060bda23/sdks/python/apache_beam/transforms/ptransform.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py
index 7cf1441..2e6255a 100644
--- a/sdks/python/apache_beam/transforms/ptransform.py
+++ b/sdks/python/apache_beam/transforms/ptransform.py
@@ -452,7 +452,6 @@ class PTransform(WithTypeHints, HasDisplayData):
     urn, typed_param = self.to_runner_api_parameter(context)
     return beam_runner_api_pb2.FunctionSpec(
         urn=urn,
-        any_param=proto_utils.pack_Any(typed_param),
         payload=typed_param.SerializeToString()
         if typed_param is not None else None)
 

http://git-wip-us.apache.org/repos/asf/beam/blob/060bda23/sdks/python/apache_beam/utils/urns.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/urns.py b/sdks/python/apache_beam/utils/urns.py
index 7675d05..2aeaa53 100644
--- a/sdks/python/apache_beam/utils/urns.py
+++ b/sdks/python/apache_beam/utils/urns.py
@@ -128,7 +128,6 @@ class RunnerApiFn(object):
     return beam_runner_api_pb2.SdkFunctionSpec(
         spec=beam_runner_api_pb2.FunctionSpec(
             urn=urn,
-            any_param=proto_utils.pack_Any(typed_param),
             payload=typed_param.SerializeToString()
             if typed_param is not None else None))