You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/11/22 01:02:52 UTC
[1/2] beam git commit: Closes #4104
Repository: beam
Updated Branches:
refs/heads/master 969c94390 -> b548e1f0f
Closes #4104
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b548e1f0
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b548e1f0
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b548e1f0
Branch: refs/heads/master
Commit: b548e1f0f526c38d0e6a7478c65600499fef33df
Parents: 969c943 212876b
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Tue Nov 21 17:00:59 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Tue Nov 21 17:00:59 2017 -0800
----------------------------------------------------------------------
.../runners/portability/fn_api_runner.py | 236 +------------------
.../runners/portability/fn_api_runner_test.py | 10 +-
2 files changed, 11 insertions(+), 235 deletions(-)
----------------------------------------------------------------------
[2/2] beam git commit: Remove obsolete dependence of FnApiRunner on
MapTaskExecutorRunner.
Posted by ro...@apache.org.
Remove obsolete dependence of FnApiRunner on MapTaskExecutorRunner.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/212876b8
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/212876b8
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/212876b8
Branch: refs/heads/master
Commit: 212876b829e372a37b854c08460095f77a1ae547
Parents: 969c943
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Wed Nov 8 16:24:38 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Tue Nov 21 17:00:59 2017 -0800
----------------------------------------------------------------------
.../runners/portability/fn_api_runner.py | 236 +------------------
.../runners/portability/fn_api_runner_test.py | 10 +-
2 files changed, 11 insertions(+), 235 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/212876b8/sdks/python/apache_beam/runners/portability/fn_api_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
index 838ce1e..674d523 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
@@ -17,7 +17,6 @@
"""A PipelineRunner using the SDK harness.
"""
-import base64
import collections
import copy
import logging
@@ -40,11 +39,9 @@ from apache_beam.portability.api import beam_fn_api_pb2
from apache_beam.portability.api import beam_fn_api_pb2_grpc
from apache_beam.portability.api import beam_runner_api_pb2
from apache_beam.runners import pipeline_context
-from apache_beam.runners.portability import maptask_executor_runner
-from apache_beam.runners.runner import PipelineState
+from apache_beam.runners import runner
from apache_beam.runners.worker import bundle_processor
from apache_beam.runners.worker import data_plane
-from apache_beam.runners.worker import operation_specs
from apache_beam.runners.worker import sdk_worker
from apache_beam.transforms import trigger
from apache_beam.transforms.window import GlobalWindows
@@ -185,7 +182,7 @@ class _WindowGroupingBuffer(object):
yield encoded_window, output_stream.get()
-class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
+class FnApiRunner(runner.PipelineRunner):
def __init__(self, use_grpc=False, sdk_harness_factory=None):
super(FnApiRunner, self).__init__()
@@ -195,19 +192,13 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
raise ValueError('GRPC must be used if a harness factory is provided.')
self._sdk_harness_factory = sdk_harness_factory
- def has_metrics_support(self):
- return False
-
def _next_uid(self):
self._last_uid += 1
return str(self._last_uid)
def run(self, pipeline):
- MetricsEnvironment.set_metrics_supported(self.has_metrics_support())
- if pipeline._verify_runner_api_compatible():
- return self.run_via_runner_api(pipeline.to_runner_api())
- else:
- return super(FnApiRunner, self).run(pipeline)
+ MetricsEnvironment.set_metrics_supported(False)
+ return self.run_via_runner_api(pipeline.to_runner_api())
def run_via_runner_api(self, pipeline_proto):
return self.run_stages(*self.create_stages(pipeline_proto))
@@ -680,7 +671,7 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
finally:
controller.close()
- return RunnerResult(PipelineState.DONE, metrics_by_stage)
+ return RunnerResult(runner.PipelineState.DONE, metrics_by_stage)
def run_stage(
self, controller, pipeline_components, stage, pcoll_buffers, safe_coders):
@@ -817,218 +808,6 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
raise NotImplementedError(pcoll_id)
return result
- # This is the "old" way of executing pipelines.
- # TODO(robertwb): Remove once runner API supports side inputs.
-
- def _map_task_registration(self, map_task, state_handler,
- data_operation_spec):
- input_data, side_input_data, runner_sinks, process_bundle_descriptor = (
- self._map_task_to_protos(map_task, data_operation_spec))
- # Side inputs will be accessed over the state API.
- for key, elements_data in side_input_data.items():
- state_key = beam_fn_api_pb2.StateKey.MultimapSideInput(key=key)
- state_handler.Clear(state_key)
- state_handler.Append(state_key, [elements_data])
- return beam_fn_api_pb2.InstructionRequest(
- instruction_id=self._next_uid(),
- register=beam_fn_api_pb2.RegisterRequest(
- process_bundle_descriptor=[process_bundle_descriptor])
- ), runner_sinks, input_data
-
- def _map_task_to_protos(self, map_task, data_operation_spec):
- input_data = {}
- side_input_data = {}
- runner_sinks = {}
-
- context = pipeline_context.PipelineContext()
- transform_protos = {}
- used_pcollections = {}
-
- def uniquify(*names):
- # An injective mapping from string* to string.
- return ':'.join("%s:%d" % (name, len(name)) for name in names)
-
- def pcollection_id(op_ix, out_ix):
- if (op_ix, out_ix) not in used_pcollections:
- used_pcollections[op_ix, out_ix] = uniquify(
- map_task[op_ix][0], 'out', str(out_ix))
- return used_pcollections[op_ix, out_ix]
-
- def get_inputs(op):
- if hasattr(op, 'inputs'):
- inputs = op.inputs
- elif hasattr(op, 'input'):
- inputs = [op.input]
- else:
- inputs = []
- return {'in%s' % ix: pcollection_id(*input)
- for ix, input in enumerate(inputs)}
-
- def get_outputs(op_ix):
- op = map_task[op_ix][1]
- return {tag: pcollection_id(op_ix, out_ix)
- for out_ix, tag in enumerate(getattr(op, 'output_tags', ['out']))}
-
- for op_ix, (stage_name, operation) in enumerate(map_task):
- transform_id = uniquify(stage_name)
-
- if isinstance(operation, operation_specs.WorkerInMemoryWrite):
- # Write this data back to the runner.
- target_name = only_element(get_inputs(operation).keys())
- runner_sinks[(transform_id, target_name)] = operation
- transform_spec = beam_runner_api_pb2.FunctionSpec(
- urn=bundle_processor.DATA_OUTPUT_URN,
- payload=data_operation_spec.SerializeToString() \
- if data_operation_spec is not None else None)
-
- elif isinstance(operation, operation_specs.WorkerRead):
- # A Read from an in-memory source is done over the data plane.
- if (isinstance(operation.source.source,
- maptask_executor_runner.InMemorySource)
- and isinstance(operation.source.source.default_output_coder(),
- WindowedValueCoder)):
- target_name = only_element(get_outputs(op_ix).keys())
- input_data[(transform_id, target_name)] = self._reencode_elements(
- operation.source.source.read(None),
- operation.source.source.default_output_coder())
- transform_spec = beam_runner_api_pb2.FunctionSpec(
- urn=bundle_processor.DATA_INPUT_URN,
- payload=data_operation_spec.SerializeToString() \
- if data_operation_spec is not None else None)
-
- else:
- # Otherwise serialize the source and execute it there.
- # TODO: Use SDFs with an initial impulse.
- # The Dataflow runner harness strips the base64 encoding. do the same
- # here until we get the same thing back that we sent in.
- source_bytes = base64.b64decode(
- pickler.dumps(operation.source.source))
- transform_spec = beam_runner_api_pb2.FunctionSpec(
- urn=bundle_processor.PYTHON_SOURCE_URN,
- payload=source_bytes)
-
- elif isinstance(operation, operation_specs.WorkerDoFn):
- # Record the contents of each side input for access via the state api.
- side_input_extras = []
- for si in operation.side_inputs:
- assert isinstance(si.source, iobase.BoundedSource)
- element_coder = si.source.default_output_coder()
- # TODO(robertwb): Actually flesh out the ViewFn API.
- side_input_extras.append((si.tag, element_coder))
- side_input_data[
- bundle_processor.side_input_tag(transform_id, si.tag)] = (
- self._reencode_elements(
- si.source.read(si.source.get_range_tracker(None, None)),
- element_coder))
- augmented_serialized_fn = pickler.dumps(
- (operation.serialized_fn, side_input_extras))
- transform_spec = beam_runner_api_pb2.FunctionSpec(
- urn=bundle_processor.PYTHON_DOFN_URN,
- payload=augmented_serialized_fn)
-
- elif isinstance(operation, operation_specs.WorkerFlatten):
- # Flatten is nice and simple.
- transform_spec = beam_runner_api_pb2.FunctionSpec(
- urn=bundle_processor.IDENTITY_DOFN_URN)
-
- else:
- raise NotImplementedError(operation)
-
- transform_protos[transform_id] = beam_runner_api_pb2.PTransform(
- unique_name=stage_name,
- spec=transform_spec,
- inputs=get_inputs(operation),
- outputs=get_outputs(op_ix))
-
- pcollection_protos = {
- name: beam_runner_api_pb2.PCollection(
- unique_name=name,
- coder_id=context.coders.get_id(
- map_task[op_id][1].output_coders[out_id]))
- for (op_id, out_id), name in used_pcollections.items()
- }
- # Must follow creation of pcollection_protos to capture used coders.
- context_proto = context.to_runner_api()
- process_bundle_descriptor = beam_fn_api_pb2.ProcessBundleDescriptor(
- id=self._next_uid(),
- transforms=transform_protos,
- pcollections=pcollection_protos,
- coders=dict(context_proto.coders.items()),
- windowing_strategies=dict(context_proto.windowing_strategies.items()),
- environments=dict(context_proto.environments.items()))
- return input_data, side_input_data, runner_sinks, process_bundle_descriptor
-
- def _run_map_task(
- self, map_task, control_handler, state_handler, data_plane_handler,
- data_operation_spec):
- registration, sinks, input_data = self._map_task_registration(
- map_task, state_handler, data_operation_spec)
- control_handler.push(registration)
- process_bundle = beam_fn_api_pb2.InstructionRequest(
- instruction_id=self._next_uid(),
- process_bundle=beam_fn_api_pb2.ProcessBundleRequest(
- process_bundle_descriptor_reference=registration.register.
- process_bundle_descriptor[0].id))
-
- for (transform_id, name), elements in input_data.items():
- data_out = data_plane_handler.output_stream(
- process_bundle.instruction_id, beam_fn_api_pb2.Target(
- primitive_transform_reference=transform_id, name=name))
- data_out.write(elements)
- data_out.close()
-
- control_handler.push(process_bundle)
- while True:
- result = control_handler.pull()
- if result.instruction_id == process_bundle.instruction_id:
- if result.error:
- raise RuntimeError(result.error)
- expected_targets = [
- beam_fn_api_pb2.Target(primitive_transform_reference=transform_id,
- name=output_name)
- for (transform_id, output_name), _ in sinks.items()]
- for output in data_plane_handler.input_elements(
- process_bundle.instruction_id, expected_targets):
- target_tuple = (
- output.target.primitive_transform_reference, output.target.name)
- if target_tuple not in sinks:
- # Unconsumed output.
- continue
- sink_op = sinks[target_tuple]
- coder = sink_op.output_coders[0]
- input_stream = create_InputStream(output.data)
- elements = []
- while input_stream.size() > 0:
- elements.append(coder.get_impl().decode_from_stream(
- input_stream, True))
- if not sink_op.write_windowed_values:
- elements = [e.value for e in elements]
- for e in elements:
- sink_op.output_buffer.append(e)
- return
-
- def execute_map_tasks(self, ordered_map_tasks, direct=False):
- if direct:
- controller = FnApiRunner.DirectController()
- else:
- controller = FnApiRunner.GrpcController()
-
- try:
- for _, map_task in ordered_map_tasks:
- logging.info('Running %s', map_task)
- self._run_map_task(
- map_task, controller.control_handler, controller.state_handler,
- controller.data_plane_handler, controller.data_operation_spec())
- finally:
- controller.close()
-
- @staticmethod
- def _reencode_elements(elements, element_coder):
- output_stream = create_OutputStream()
- for element in elements:
- element_coder.get_impl().encode_to_stream(element, output_stream, True)
- return output_stream.get()
-
# These classes are used to interact with the worker.
class StateServicer(beam_fn_api_pb2_grpc.BeamFnStateServicer):
@@ -1158,11 +937,14 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
self.data_server.stop(5).wait()
-class RunnerResult(maptask_executor_runner.WorkerRunnerResult):
+class RunnerResult(runner.PipelineResult):
def __init__(self, state, metrics_by_stage):
super(RunnerResult, self).__init__(state)
self._metrics_by_stage = metrics_by_stage
+ def wait_until_finish(self, duration=None):
+ pass
+
def only_element(iterable):
element, = iterable
http://git-wip-us.apache.org/repos/asf/beam/blob/212876b8/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
index ea9ed1a..249eece 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
@@ -32,6 +32,8 @@ except ImportError:
DEFAULT_SAMPLING_PERIOD_MS = 0
+# Inherit good model test coverage from
+# maptask_executor_runner_test.MapTaskExecutorRunnerTest.
class FnApiRunnerTest(
maptask_executor_runner_test.MapTaskExecutorRunnerTest):
@@ -39,14 +41,6 @@ class FnApiRunnerTest(
return beam.Pipeline(
runner=fn_api_runner.FnApiRunner(use_grpc=False))
- def test_combine_per_key(self):
- # TODO(BEAM-1348): Enable once Partial GBK is supported in fn API.
- pass
-
- def test_combine_per_key(self):
- # TODO(BEAM-1348): Enable once Partial GBK is supported in fn API.
- pass
-
def test_pardo_side_inputs(self):
def cross_product(elem, sides):
for side in sides: