You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/11/22 01:02:52 UTC

[1/2] beam git commit: Closes #4104

Repository: beam
Updated Branches:
  refs/heads/master 969c94390 -> b548e1f0f


Closes #4104


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b548e1f0
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b548e1f0
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b548e1f0

Branch: refs/heads/master
Commit: b548e1f0f526c38d0e6a7478c65600499fef33df
Parents: 969c943 212876b
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Tue Nov 21 17:00:59 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Tue Nov 21 17:00:59 2017 -0800

----------------------------------------------------------------------
 .../runners/portability/fn_api_runner.py        | 236 +------------------
 .../runners/portability/fn_api_runner_test.py   |  10 +-
 2 files changed, 11 insertions(+), 235 deletions(-)
----------------------------------------------------------------------



[2/2] beam git commit: Remove obsolete dependence of FnApiRunner on MapTaskExecutorRunner.

Posted by ro...@apache.org.
Remove obsolete dependence of FnApiRunner on MapTaskExecutorRunner.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/212876b8
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/212876b8
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/212876b8

Branch: refs/heads/master
Commit: 212876b829e372a37b854c08460095f77a1ae547
Parents: 969c943
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Wed Nov 8 16:24:38 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Tue Nov 21 17:00:59 2017 -0800

----------------------------------------------------------------------
 .../runners/portability/fn_api_runner.py        | 236 +------------------
 .../runners/portability/fn_api_runner_test.py   |  10 +-
 2 files changed, 11 insertions(+), 235 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/212876b8/sdks/python/apache_beam/runners/portability/fn_api_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
index 838ce1e..674d523 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
@@ -17,7 +17,6 @@
 
 """A PipelineRunner using the SDK harness.
 """
-import base64
 import collections
 import copy
 import logging
@@ -40,11 +39,9 @@ from apache_beam.portability.api import beam_fn_api_pb2
 from apache_beam.portability.api import beam_fn_api_pb2_grpc
 from apache_beam.portability.api import beam_runner_api_pb2
 from apache_beam.runners import pipeline_context
-from apache_beam.runners.portability import maptask_executor_runner
-from apache_beam.runners.runner import PipelineState
+from apache_beam.runners import runner
 from apache_beam.runners.worker import bundle_processor
 from apache_beam.runners.worker import data_plane
-from apache_beam.runners.worker import operation_specs
 from apache_beam.runners.worker import sdk_worker
 from apache_beam.transforms import trigger
 from apache_beam.transforms.window import GlobalWindows
@@ -185,7 +182,7 @@ class _WindowGroupingBuffer(object):
       yield encoded_window, output_stream.get()
 
 
-class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
+class FnApiRunner(runner.PipelineRunner):
 
   def __init__(self, use_grpc=False, sdk_harness_factory=None):
     super(FnApiRunner, self).__init__()
@@ -195,19 +192,13 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
       raise ValueError('GRPC must be used if a harness factory is provided.')
     self._sdk_harness_factory = sdk_harness_factory
 
-  def has_metrics_support(self):
-    return False
-
   def _next_uid(self):
     self._last_uid += 1
     return str(self._last_uid)
 
   def run(self, pipeline):
-    MetricsEnvironment.set_metrics_supported(self.has_metrics_support())
-    if pipeline._verify_runner_api_compatible():
-      return self.run_via_runner_api(pipeline.to_runner_api())
-    else:
-      return super(FnApiRunner, self).run(pipeline)
+    MetricsEnvironment.set_metrics_supported(False)
+    return self.run_via_runner_api(pipeline.to_runner_api())
 
   def run_via_runner_api(self, pipeline_proto):
     return self.run_stages(*self.create_stages(pipeline_proto))
@@ -680,7 +671,7 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
     finally:
       controller.close()
 
-    return RunnerResult(PipelineState.DONE, metrics_by_stage)
+    return RunnerResult(runner.PipelineState.DONE, metrics_by_stage)
 
   def run_stage(
       self, controller, pipeline_components, stage, pcoll_buffers, safe_coders):
@@ -817,218 +808,6 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
           raise NotImplementedError(pcoll_id)
     return result
 
-  # This is the "old" way of executing pipelines.
-  # TODO(robertwb): Remove once runner API supports side inputs.
-
-  def _map_task_registration(self, map_task, state_handler,
-                             data_operation_spec):
-    input_data, side_input_data, runner_sinks, process_bundle_descriptor = (
-        self._map_task_to_protos(map_task, data_operation_spec))
-    # Side inputs will be accessed over the state API.
-    for key, elements_data in side_input_data.items():
-      state_key = beam_fn_api_pb2.StateKey.MultimapSideInput(key=key)
-      state_handler.Clear(state_key)
-      state_handler.Append(state_key, [elements_data])
-    return beam_fn_api_pb2.InstructionRequest(
-        instruction_id=self._next_uid(),
-        register=beam_fn_api_pb2.RegisterRequest(
-            process_bundle_descriptor=[process_bundle_descriptor])
-        ), runner_sinks, input_data
-
-  def _map_task_to_protos(self, map_task, data_operation_spec):
-    input_data = {}
-    side_input_data = {}
-    runner_sinks = {}
-
-    context = pipeline_context.PipelineContext()
-    transform_protos = {}
-    used_pcollections = {}
-
-    def uniquify(*names):
-      # An injective mapping from string* to string.
-      return ':'.join("%s:%d" % (name, len(name)) for name in names)
-
-    def pcollection_id(op_ix, out_ix):
-      if (op_ix, out_ix) not in used_pcollections:
-        used_pcollections[op_ix, out_ix] = uniquify(
-            map_task[op_ix][0], 'out', str(out_ix))
-      return used_pcollections[op_ix, out_ix]
-
-    def get_inputs(op):
-      if hasattr(op, 'inputs'):
-        inputs = op.inputs
-      elif hasattr(op, 'input'):
-        inputs = [op.input]
-      else:
-        inputs = []
-      return {'in%s' % ix: pcollection_id(*input)
-              for ix, input in enumerate(inputs)}
-
-    def get_outputs(op_ix):
-      op = map_task[op_ix][1]
-      return {tag: pcollection_id(op_ix, out_ix)
-              for out_ix, tag in enumerate(getattr(op, 'output_tags', ['out']))}
-
-    for op_ix, (stage_name, operation) in enumerate(map_task):
-      transform_id = uniquify(stage_name)
-
-      if isinstance(operation, operation_specs.WorkerInMemoryWrite):
-        # Write this data back to the runner.
-        target_name = only_element(get_inputs(operation).keys())
-        runner_sinks[(transform_id, target_name)] = operation
-        transform_spec = beam_runner_api_pb2.FunctionSpec(
-            urn=bundle_processor.DATA_OUTPUT_URN,
-            payload=data_operation_spec.SerializeToString() \
-                if data_operation_spec is not None else None)
-
-      elif isinstance(operation, operation_specs.WorkerRead):
-        # A Read from an in-memory source is done over the data plane.
-        if (isinstance(operation.source.source,
-                       maptask_executor_runner.InMemorySource)
-            and isinstance(operation.source.source.default_output_coder(),
-                           WindowedValueCoder)):
-          target_name = only_element(get_outputs(op_ix).keys())
-          input_data[(transform_id, target_name)] = self._reencode_elements(
-              operation.source.source.read(None),
-              operation.source.source.default_output_coder())
-          transform_spec = beam_runner_api_pb2.FunctionSpec(
-              urn=bundle_processor.DATA_INPUT_URN,
-              payload=data_operation_spec.SerializeToString() \
-                  if data_operation_spec is not None else None)
-
-        else:
-          # Otherwise serialize the source and execute it there.
-          # TODO: Use SDFs with an initial impulse.
-          # The Dataflow runner harness strips the base64 encoding. do the same
-          # here until we get the same thing back that we sent in.
-          source_bytes = base64.b64decode(
-              pickler.dumps(operation.source.source))
-          transform_spec = beam_runner_api_pb2.FunctionSpec(
-              urn=bundle_processor.PYTHON_SOURCE_URN,
-              payload=source_bytes)
-
-      elif isinstance(operation, operation_specs.WorkerDoFn):
-        # Record the contents of each side input for access via the state api.
-        side_input_extras = []
-        for si in operation.side_inputs:
-          assert isinstance(si.source, iobase.BoundedSource)
-          element_coder = si.source.default_output_coder()
-          # TODO(robertwb): Actually flesh out the ViewFn API.
-          side_input_extras.append((si.tag, element_coder))
-          side_input_data[
-              bundle_processor.side_input_tag(transform_id, si.tag)] = (
-                  self._reencode_elements(
-                      si.source.read(si.source.get_range_tracker(None, None)),
-                      element_coder))
-        augmented_serialized_fn = pickler.dumps(
-            (operation.serialized_fn, side_input_extras))
-        transform_spec = beam_runner_api_pb2.FunctionSpec(
-            urn=bundle_processor.PYTHON_DOFN_URN,
-            payload=augmented_serialized_fn)
-
-      elif isinstance(operation, operation_specs.WorkerFlatten):
-        # Flatten is nice and simple.
-        transform_spec = beam_runner_api_pb2.FunctionSpec(
-            urn=bundle_processor.IDENTITY_DOFN_URN)
-
-      else:
-        raise NotImplementedError(operation)
-
-      transform_protos[transform_id] = beam_runner_api_pb2.PTransform(
-          unique_name=stage_name,
-          spec=transform_spec,
-          inputs=get_inputs(operation),
-          outputs=get_outputs(op_ix))
-
-    pcollection_protos = {
-        name: beam_runner_api_pb2.PCollection(
-            unique_name=name,
-            coder_id=context.coders.get_id(
-                map_task[op_id][1].output_coders[out_id]))
-        for (op_id, out_id), name in used_pcollections.items()
-    }
-    # Must follow creation of pcollection_protos to capture used coders.
-    context_proto = context.to_runner_api()
-    process_bundle_descriptor = beam_fn_api_pb2.ProcessBundleDescriptor(
-        id=self._next_uid(),
-        transforms=transform_protos,
-        pcollections=pcollection_protos,
-        coders=dict(context_proto.coders.items()),
-        windowing_strategies=dict(context_proto.windowing_strategies.items()),
-        environments=dict(context_proto.environments.items()))
-    return input_data, side_input_data, runner_sinks, process_bundle_descriptor
-
-  def _run_map_task(
-      self, map_task, control_handler, state_handler, data_plane_handler,
-      data_operation_spec):
-    registration, sinks, input_data = self._map_task_registration(
-        map_task, state_handler, data_operation_spec)
-    control_handler.push(registration)
-    process_bundle = beam_fn_api_pb2.InstructionRequest(
-        instruction_id=self._next_uid(),
-        process_bundle=beam_fn_api_pb2.ProcessBundleRequest(
-            process_bundle_descriptor_reference=registration.register.
-            process_bundle_descriptor[0].id))
-
-    for (transform_id, name), elements in input_data.items():
-      data_out = data_plane_handler.output_stream(
-          process_bundle.instruction_id, beam_fn_api_pb2.Target(
-              primitive_transform_reference=transform_id, name=name))
-      data_out.write(elements)
-      data_out.close()
-
-    control_handler.push(process_bundle)
-    while True:
-      result = control_handler.pull()
-      if result.instruction_id == process_bundle.instruction_id:
-        if result.error:
-          raise RuntimeError(result.error)
-        expected_targets = [
-            beam_fn_api_pb2.Target(primitive_transform_reference=transform_id,
-                                   name=output_name)
-            for (transform_id, output_name), _ in sinks.items()]
-        for output in data_plane_handler.input_elements(
-            process_bundle.instruction_id, expected_targets):
-          target_tuple = (
-              output.target.primitive_transform_reference, output.target.name)
-          if target_tuple not in sinks:
-            # Unconsumed output.
-            continue
-          sink_op = sinks[target_tuple]
-          coder = sink_op.output_coders[0]
-          input_stream = create_InputStream(output.data)
-          elements = []
-          while input_stream.size() > 0:
-            elements.append(coder.get_impl().decode_from_stream(
-                input_stream, True))
-          if not sink_op.write_windowed_values:
-            elements = [e.value for e in elements]
-          for e in elements:
-            sink_op.output_buffer.append(e)
-        return
-
-  def execute_map_tasks(self, ordered_map_tasks, direct=False):
-    if direct:
-      controller = FnApiRunner.DirectController()
-    else:
-      controller = FnApiRunner.GrpcController()
-
-    try:
-      for _, map_task in ordered_map_tasks:
-        logging.info('Running %s', map_task)
-        self._run_map_task(
-            map_task, controller.control_handler, controller.state_handler,
-            controller.data_plane_handler, controller.data_operation_spec())
-    finally:
-      controller.close()
-
-  @staticmethod
-  def _reencode_elements(elements, element_coder):
-    output_stream = create_OutputStream()
-    for element in elements:
-      element_coder.get_impl().encode_to_stream(element, output_stream, True)
-    return output_stream.get()
-
   # These classes are used to interact with the worker.
 
   class StateServicer(beam_fn_api_pb2_grpc.BeamFnStateServicer):
@@ -1158,11 +937,14 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
       self.data_server.stop(5).wait()
 
 
-class RunnerResult(maptask_executor_runner.WorkerRunnerResult):
+class RunnerResult(runner.PipelineResult):
   def __init__(self, state, metrics_by_stage):
     super(RunnerResult, self).__init__(state)
     self._metrics_by_stage = metrics_by_stage
 
+  def wait_until_finish(self, duration=None):
+    pass
+
 
 def only_element(iterable):
   element, = iterable

http://git-wip-us.apache.org/repos/asf/beam/blob/212876b8/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
index ea9ed1a..249eece 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
@@ -32,6 +32,8 @@ except ImportError:
   DEFAULT_SAMPLING_PERIOD_MS = 0
 
 
+# Inherit good model test coverage from
+# maptask_executor_runner_test.MapTaskExecutorRunnerTest.
 class FnApiRunnerTest(
     maptask_executor_runner_test.MapTaskExecutorRunnerTest):
 
@@ -39,14 +41,6 @@ class FnApiRunnerTest(
     return beam.Pipeline(
         runner=fn_api_runner.FnApiRunner(use_grpc=False))
 
-  def test_combine_per_key(self):
-    # TODO(BEAM-1348): Enable once Partial GBK is supported in fn API.
-    pass
-
-  def test_combine_per_key(self):
-    # TODO(BEAM-1348): Enable once Partial GBK is supported in fn API.
-    pass
-
   def test_pardo_side_inputs(self):
     def cross_product(elem, sides):
       for side in sides: