You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/06/23 03:05:10 UTC

[44/50] beam git commit: Remove fn api bundle descriptor translation.

Remove fn api bundle descriptor translation.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5d6ad199
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5d6ad199
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5d6ad199

Branch: refs/heads/gearpump-runner
Commit: 5d6ad19958d0a2394f9e33720a04cc954279a7e7
Parents: 7645c44
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Thu Jun 22 12:44:23 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Jun 22 17:05:32 2017 -0700

----------------------------------------------------------------------
 .../runners/portability/fn_api_runner.py        | 191 +------------------
 .../runners/portability/fn_api_runner_test.py   |  18 +-
 .../apache_beam/runners/worker/sdk_worker.py    | 150 ---------------
 3 files changed, 4 insertions(+), 355 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/5d6ad199/sdks/python/apache_beam/runners/portability/fn_api_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
index a27e293..b45ff76 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
@@ -115,13 +115,9 @@ OLDE_SOURCE_SPLITTABLE_DOFN_DATA = pickler.dumps(
 
 class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
 
-  def __init__(self, use_runner_protos=False):
+  def __init__(self):
     super(FnApiRunner, self).__init__()
     self._last_uid = -1
-    if use_runner_protos:
-      self._map_task_to_protos = self._map_task_to_runner_protos
-    else:
-      self._map_task_to_protos = self._map_task_to_fn_protos
 
   def has_metrics_support(self):
     return False
@@ -145,7 +141,7 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
             process_bundle_descriptor=[process_bundle_descriptor])
         ), runner_sinks, input_data
 
-  def _map_task_to_runner_protos(self, map_task, data_operation_spec):
+  def _map_task_to_protos(self, map_task, data_operation_spec):
     input_data = {}
     side_input_data = {}
     runner_sinks = {}
@@ -265,189 +261,6 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
         environments=dict(context_proto.environments.items()))
     return input_data, side_input_data, runner_sinks, process_bundle_descriptor
 
-  def _map_task_to_fn_protos(self, map_task, data_operation_spec):
-
-    input_data = {}
-    side_input_data = {}
-    runner_sinks = {}
-    transforms = []
-    transform_index_to_id = {}
-
-    # Maps coders to new coder objects and references.
-    coders = {}
-
-    def coder_id(coder):
-      if coder not in coders:
-        coders[coder] = beam_fn_api_pb2.Coder(
-            function_spec=sdk_worker.pack_function_spec_data(
-                json.dumps(coder.as_cloud_object()),
-                sdk_worker.PYTHON_CODER_URN, id=self._next_uid()))
-
-      return coders[coder].function_spec.id
-
-    def output_tags(op):
-      return getattr(op, 'output_tags', ['out'])
-
-    def as_target(op_input):
-      input_op_index, input_output_index = op_input
-      input_op = map_task[input_op_index][1]
-      return {
-          'ignored_input_tag':
-              beam_fn_api_pb2.Target.List(target=[
-                  beam_fn_api_pb2.Target(
-                      primitive_transform_reference=transform_index_to_id[
-                          input_op_index],
-                      name=output_tags(input_op)[input_output_index])
-              ])
-      }
-
-    def outputs(op):
-      return {
-          tag: beam_fn_api_pb2.PCollection(coder_reference=coder_id(coder))
-          for tag, coder in zip(output_tags(op), op.output_coders)
-      }
-
-    for op_ix, (stage_name, operation) in enumerate(map_task):
-      transform_id = transform_index_to_id[op_ix] = self._next_uid()
-      if isinstance(operation, operation_specs.WorkerInMemoryWrite):
-        # Write this data back to the runner.
-        fn = beam_fn_api_pb2.FunctionSpec(urn=sdk_worker.DATA_OUTPUT_URN,
-                                          id=self._next_uid())
-        if data_operation_spec:
-          fn.data.Pack(data_operation_spec)
-        inputs = as_target(operation.input)
-        side_inputs = {}
-        runner_sinks[(transform_id, 'out')] = operation
-
-      elif isinstance(operation, operation_specs.WorkerRead):
-        # A Read is either translated to a direct injection of windowed values
-        # into the sdk worker, or an injection of the source object into the
-        # sdk worker as data followed by an SDF that reads that source.
-        if (isinstance(operation.source.source,
-                       maptask_executor_runner.InMemorySource)
-            and isinstance(operation.source.source.default_output_coder(),
-                           WindowedValueCoder)):
-          output_stream = create_OutputStream()
-          element_coder = (
-              operation.source.source.default_output_coder().get_impl())
-          # Re-encode the elements in the nested context and
-          # concatenate them together
-          for element in operation.source.source.read(None):
-            element_coder.encode_to_stream(element, output_stream, True)
-          target_name = self._next_uid()
-          input_data[(transform_id, target_name)] = output_stream.get()
-          fn = beam_fn_api_pb2.FunctionSpec(urn=sdk_worker.DATA_INPUT_URN,
-                                            id=self._next_uid())
-          if data_operation_spec:
-            fn.data.Pack(data_operation_spec)
-          inputs = {target_name: beam_fn_api_pb2.Target.List()}
-          side_inputs = {}
-        else:
-          # Read the source object from the runner.
-          source_coder = beam.coders.DillCoder()
-          input_transform_id = self._next_uid()
-          output_stream = create_OutputStream()
-          source_coder.get_impl().encode_to_stream(
-              GlobalWindows.windowed_value(operation.source),
-              output_stream,
-              True)
-          target_name = self._next_uid()
-          input_data[(input_transform_id, target_name)] = output_stream.get()
-          input_ptransform = beam_fn_api_pb2.PrimitiveTransform(
-              id=input_transform_id,
-              function_spec=beam_fn_api_pb2.FunctionSpec(
-                  urn=sdk_worker.DATA_INPUT_URN,
-                  id=self._next_uid()),
-              # TODO(robertwb): Possible name collision.
-              step_name=stage_name + '/inject_source',
-              inputs={target_name: beam_fn_api_pb2.Target.List()},
-              outputs={
-                  'out':
-                      beam_fn_api_pb2.PCollection(
-                          coder_reference=coder_id(source_coder))
-              })
-          if data_operation_spec:
-            input_ptransform.function_spec.data.Pack(data_operation_spec)
-          transforms.append(input_ptransform)
-
-          # Read the elements out of the source.
-          fn = sdk_worker.pack_function_spec_data(
-              OLDE_SOURCE_SPLITTABLE_DOFN_DATA,
-              sdk_worker.PYTHON_DOFN_URN,
-              id=self._next_uid())
-          inputs = {
-              'ignored_input_tag':
-                  beam_fn_api_pb2.Target.List(target=[
-                      beam_fn_api_pb2.Target(
-                          primitive_transform_reference=input_transform_id,
-                          name='out')
-                  ])
-          }
-          side_inputs = {}
-
-      elif isinstance(operation, operation_specs.WorkerDoFn):
-        fn = sdk_worker.pack_function_spec_data(
-            operation.serialized_fn,
-            sdk_worker.PYTHON_DOFN_URN,
-            id=self._next_uid())
-        inputs = as_target(operation.input)
-        # Store the contents of each side input for state access.
-        for si in operation.side_inputs:
-          assert isinstance(si.source, iobase.BoundedSource)
-          element_coder = si.source.default_output_coder()
-          view_id = self._next_uid()
-          # TODO(robertwb): Actually flesh out the ViewFn API.
-          side_inputs[si.tag] = beam_fn_api_pb2.SideInput(
-              view_fn=sdk_worker.serialize_and_pack_py_fn(
-                  element_coder, urn=sdk_worker.PYTHON_ITERABLE_VIEWFN_URN,
-                  id=view_id))
-          # Re-encode the elements in the nested context and
-          # concatenate them together
-          output_stream = create_OutputStream()
-          for element in si.source.read(
-              si.source.get_range_tracker(None, None)):
-            element_coder.get_impl().encode_to_stream(
-                element, output_stream, True)
-          elements_data = output_stream.get()
-          side_input_data[view_id] = elements_data
-
-      elif isinstance(operation, operation_specs.WorkerFlatten):
-        fn = sdk_worker.pack_function_spec_data(
-            operation.serialized_fn,
-            sdk_worker.IDENTITY_DOFN_URN,
-            id=self._next_uid())
-        inputs = {
-            'ignored_input_tag':
-                beam_fn_api_pb2.Target.List(target=[
-                    beam_fn_api_pb2.Target(
-                        primitive_transform_reference=transform_index_to_id[
-                            input_op_index],
-                        name=output_tags(map_task[input_op_index][1])[
-                            input_output_index])
-                    for input_op_index, input_output_index in operation.inputs
-                ])
-        }
-        side_inputs = {}
-
-      else:
-        raise TypeError(operation)
-
-      ptransform = beam_fn_api_pb2.PrimitiveTransform(
-          id=transform_id,
-          function_spec=fn,
-          step_name=stage_name,
-          inputs=inputs,
-          side_inputs=side_inputs,
-          outputs=outputs(operation))
-      transforms.append(ptransform)
-
-    process_bundle_descriptor = beam_fn_api_pb2.ProcessBundleDescriptor(
-        id=self._next_uid(),
-        coders=coders.values(),
-        primitive_transform=transforms)
-
-    return input_data, side_input_data, runner_sinks, process_bundle_descriptor
-
   def _run_map_task(
       self, map_task, control_handler, state_handler, data_plane_handler,
       data_operation_spec):

http://git-wip-us.apache.org/repos/asf/beam/blob/5d6ad199/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
index e2eae26..9159035 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
@@ -23,26 +23,12 @@ from apache_beam.runners.portability import fn_api_runner
 from apache_beam.runners.portability import maptask_executor_runner_test
 
 
-class FnApiRunnerTestWithRunnerProtos(
+class FnApiRunnerTest(
     maptask_executor_runner_test.MapTaskExecutorRunnerTest):
 
   def create_pipeline(self):
     return beam.Pipeline(
-        runner=fn_api_runner.FnApiRunner(use_runner_protos=True))
-
-  def test_combine_per_key(self):
-    # TODO(robertwb): Implement PGBKCV operation.
-    pass
-
-  # Inherits all tests from maptask_executor_runner.MapTaskExecutorRunner
-
-
-class FnApiRunnerTestWithFnProtos(
-    maptask_executor_runner_test.MapTaskExecutorRunnerTest):
-
-  def create_pipeline(self):
-    return beam.Pipeline(
-        runner=fn_api_runner.FnApiRunner(use_runner_protos=False))
+        runner=fn_api_runner.FnApiRunner())
 
   def test_combine_per_key(self):
     # TODO(robertwb): Implement PGBKCV operation.

http://git-wip-us.apache.org/repos/asf/beam/blob/5d6ad199/sdks/python/apache_beam/runners/worker/sdk_worker.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py
index a2c9f42..d135984 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py
@@ -196,25 +196,6 @@ def pack_function_spec_data(value, urn, id=None):
 # pylint: enable=redefined-builtin
 
 
-# TODO(vikasrk): Consistently use same format everywhere.
-def load_compressed(compressed_data):
-  """Returns a decompressed and deserialized python object."""
-  # Note: SDK uses ``pickler.dumps`` to serialize certain python objects
-  # (like sources), which involves serialization, compression and base64
-  # encoding. We cannot directly use ``pickler.loads`` for
-  # deserialization, as the runner would have already base64 decoded the
-  # data. So we only need to decompress and deserialize.
-
-  data = zlib.decompress(compressed_data)
-  try:
-    return dill.loads(data)
-  except Exception:          # pylint: disable=broad-except
-    dill.dill._trace(True)   # pylint: disable=protected-access
-    return dill.loads(data)
-  finally:
-    dill.dill._trace(False)  # pylint: disable=protected-access
-
-
 def memoize(func):
   cache = {}
   missing = object()
@@ -324,12 +305,6 @@ class SdkWorker(object):
     return response
 
   def create_execution_tree(self, descriptor):
-    if descriptor.transforms:
-      return self.create_execution_tree_from_runner_api(descriptor)
-    else:
-      return self.create_execution_tree_from_fn_api(descriptor)
-
-  def create_execution_tree_from_runner_api(self, descriptor):
     # TODO(robertwb): Figure out the correct prefix to use for output counters
     # from StateSampler.
     counter_factory = counters.CounterFactory()
@@ -368,131 +343,6 @@ class SdkWorker(object):
             for transform_id in sorted(
                 descriptor.transforms, key=topological_height, reverse=True)]
 
-  def create_execution_tree_from_fn_api(self, descriptor):
-    # TODO(vikasrk): Add an id field to Coder proto and use that instead.
-    coders = {coder.function_spec.id: operation_specs.get_coder_from_spec(
-        json.loads(unpack_function_spec_data(coder.function_spec)))
-              for coder in descriptor.coders}
-
-    counter_factory = counters.CounterFactory()
-    # TODO(robertwb): Figure out the correct prefix to use for output counters
-    # from StateSampler.
-    state_sampler = statesampler.StateSampler(
-        'fnapi-step%s-' % descriptor.id, counter_factory)
-    consumers = collections.defaultdict(lambda: collections.defaultdict(list))
-    ops_by_id = {}
-    reversed_ops = []
-
-    for transform in reversed(descriptor.primitive_transform):
-      # TODO(robertwb): Figure out how to plumb through the operation name (e.g.
-      # "s3") from the service through the FnAPI so that msec counters can be
-      # reported and correctly plumbed through the service and the UI.
-      operation_name = 'fnapis%s' % transform.id
-
-      def only_element(iterable):
-        element, = iterable
-        return element
-
-      if transform.function_spec.urn == DATA_OUTPUT_URN:
-        target = beam_fn_api_pb2.Target(
-            primitive_transform_reference=transform.id,
-            name=only_element(transform.outputs.keys()))
-
-        op = DataOutputOperation(
-            operation_name,
-            transform.step_name,
-            consumers[transform.id],
-            counter_factory,
-            state_sampler,
-            coders[only_element(transform.outputs.values()).coder_reference],
-            target,
-            self.data_channel_factory.create_data_channel(
-                transform.function_spec))
-
-      elif transform.function_spec.urn == DATA_INPUT_URN:
-        target = beam_fn_api_pb2.Target(
-            primitive_transform_reference=transform.id,
-            name=only_element(transform.inputs.keys()))
-        op = DataInputOperation(
-            operation_name,
-            transform.step_name,
-            consumers[transform.id],
-            counter_factory,
-            state_sampler,
-            coders[only_element(transform.outputs.values()).coder_reference],
-            target,
-            self.data_channel_factory.create_data_channel(
-                transform.function_spec))
-
-      elif transform.function_spec.urn == PYTHON_DOFN_URN:
-        def create_side_input(tag, si):
-          # TODO(robertwb): Extract windows (and keys) out of element data.
-          return operation_specs.WorkerSideInputSource(
-              tag=tag,
-              source=SideInputSource(
-                  self.state_handler,
-                  beam_fn_api_pb2.StateKey.MultimapSideInput(
-                      key=si.view_fn.id.encode('utf-8')),
-                  coder=unpack_and_deserialize_py_fn(si.view_fn)))
-        output_tags = list(transform.outputs.keys())
-        spec = operation_specs.WorkerDoFn(
-            serialized_fn=unpack_function_spec_data(transform.function_spec),
-            output_tags=output_tags,
-            input=None,
-            side_inputs=[create_side_input(tag, si)
-                         for tag, si in transform.side_inputs.items()],
-            output_coders=[coders[transform.outputs[out].coder_reference]
-                           for out in output_tags])
-
-        op = operations.DoOperation(operation_name, spec, counter_factory,
-                                    state_sampler)
-        # TODO(robertwb): Move these to the constructor.
-        op.step_name = transform.step_name
-        for tag, op_consumers in consumers[transform.id].items():
-          for consumer in op_consumers:
-            op.add_receiver(
-                consumer, output_tags.index(tag))
-
-      elif transform.function_spec.urn == IDENTITY_DOFN_URN:
-        op = operations.FlattenOperation(operation_name, None, counter_factory,
-                                         state_sampler)
-        # TODO(robertwb): Move these to the constructor.
-        op.step_name = transform.step_name
-        for tag, op_consumers in consumers[transform.id].items():
-          for consumer in op_consumers:
-            op.add_receiver(consumer, 0)
-
-      elif transform.function_spec.urn == PYTHON_SOURCE_URN:
-        source = load_compressed(unpack_function_spec_data(
-            transform.function_spec))
-        # TODO(vikasrk): Remove this once custom source is implemented with
-        # splittable dofn via the data plane.
-        spec = operation_specs.WorkerRead(
-            iobase.SourceBundle(1.0, source, None, None),
-            [WindowedValueCoder(source.default_output_coder())])
-        op = operations.ReadOperation(operation_name, spec, counter_factory,
-                                      state_sampler)
-        op.step_name = transform.step_name
-        output_tags = list(transform.outputs.keys())
-        for tag, op_consumers in consumers[transform.id].items():
-          for consumer in op_consumers:
-            op.add_receiver(
-                consumer, output_tags.index(tag))
-
-      else:
-        raise NotImplementedError
-
-      # Record consumers.
-      for _, inputs in transform.inputs.items():
-        for target in inputs.target:
-          consumers[target.primitive_transform_reference][target.name].append(
-              op)
-
-      reversed_ops.append(op)
-      ops_by_id[transform.id] = op
-
-    return list(reversed(reversed_ops))
-
   def process_bundle(self, request, instruction_id):
     ops = self.create_execution_tree(
         self.fns[request.process_bundle_descriptor_reference])