You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/06/23 03:05:10 UTC
[44/50] beam git commit: Remove fn api bundle descriptor translation.
Remove fn api bundle descriptor translation.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5d6ad199
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5d6ad199
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5d6ad199
Branch: refs/heads/gearpump-runner
Commit: 5d6ad19958d0a2394f9e33720a04cc954279a7e7
Parents: 7645c44
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Thu Jun 22 12:44:23 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Jun 22 17:05:32 2017 -0700
----------------------------------------------------------------------
.../runners/portability/fn_api_runner.py | 191 +------------------
.../runners/portability/fn_api_runner_test.py | 18 +-
.../apache_beam/runners/worker/sdk_worker.py | 150 ---------------
3 files changed, 4 insertions(+), 355 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/5d6ad199/sdks/python/apache_beam/runners/portability/fn_api_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
index a27e293..b45ff76 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
@@ -115,13 +115,9 @@ OLDE_SOURCE_SPLITTABLE_DOFN_DATA = pickler.dumps(
class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
- def __init__(self, use_runner_protos=False):
+ def __init__(self):
super(FnApiRunner, self).__init__()
self._last_uid = -1
- if use_runner_protos:
- self._map_task_to_protos = self._map_task_to_runner_protos
- else:
- self._map_task_to_protos = self._map_task_to_fn_protos
def has_metrics_support(self):
return False
@@ -145,7 +141,7 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
process_bundle_descriptor=[process_bundle_descriptor])
), runner_sinks, input_data
- def _map_task_to_runner_protos(self, map_task, data_operation_spec):
+ def _map_task_to_protos(self, map_task, data_operation_spec):
input_data = {}
side_input_data = {}
runner_sinks = {}
@@ -265,189 +261,6 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
environments=dict(context_proto.environments.items()))
return input_data, side_input_data, runner_sinks, process_bundle_descriptor
- def _map_task_to_fn_protos(self, map_task, data_operation_spec):
-
- input_data = {}
- side_input_data = {}
- runner_sinks = {}
- transforms = []
- transform_index_to_id = {}
-
- # Maps coders to new coder objects and references.
- coders = {}
-
- def coder_id(coder):
- if coder not in coders:
- coders[coder] = beam_fn_api_pb2.Coder(
- function_spec=sdk_worker.pack_function_spec_data(
- json.dumps(coder.as_cloud_object()),
- sdk_worker.PYTHON_CODER_URN, id=self._next_uid()))
-
- return coders[coder].function_spec.id
-
- def output_tags(op):
- return getattr(op, 'output_tags', ['out'])
-
- def as_target(op_input):
- input_op_index, input_output_index = op_input
- input_op = map_task[input_op_index][1]
- return {
- 'ignored_input_tag':
- beam_fn_api_pb2.Target.List(target=[
- beam_fn_api_pb2.Target(
- primitive_transform_reference=transform_index_to_id[
- input_op_index],
- name=output_tags(input_op)[input_output_index])
- ])
- }
-
- def outputs(op):
- return {
- tag: beam_fn_api_pb2.PCollection(coder_reference=coder_id(coder))
- for tag, coder in zip(output_tags(op), op.output_coders)
- }
-
- for op_ix, (stage_name, operation) in enumerate(map_task):
- transform_id = transform_index_to_id[op_ix] = self._next_uid()
- if isinstance(operation, operation_specs.WorkerInMemoryWrite):
- # Write this data back to the runner.
- fn = beam_fn_api_pb2.FunctionSpec(urn=sdk_worker.DATA_OUTPUT_URN,
- id=self._next_uid())
- if data_operation_spec:
- fn.data.Pack(data_operation_spec)
- inputs = as_target(operation.input)
- side_inputs = {}
- runner_sinks[(transform_id, 'out')] = operation
-
- elif isinstance(operation, operation_specs.WorkerRead):
- # A Read is either translated to a direct injection of windowed values
- # into the sdk worker, or an injection of the source object into the
- # sdk worker as data followed by an SDF that reads that source.
- if (isinstance(operation.source.source,
- maptask_executor_runner.InMemorySource)
- and isinstance(operation.source.source.default_output_coder(),
- WindowedValueCoder)):
- output_stream = create_OutputStream()
- element_coder = (
- operation.source.source.default_output_coder().get_impl())
- # Re-encode the elements in the nested context and
- # concatenate them together
- for element in operation.source.source.read(None):
- element_coder.encode_to_stream(element, output_stream, True)
- target_name = self._next_uid()
- input_data[(transform_id, target_name)] = output_stream.get()
- fn = beam_fn_api_pb2.FunctionSpec(urn=sdk_worker.DATA_INPUT_URN,
- id=self._next_uid())
- if data_operation_spec:
- fn.data.Pack(data_operation_spec)
- inputs = {target_name: beam_fn_api_pb2.Target.List()}
- side_inputs = {}
- else:
- # Read the source object from the runner.
- source_coder = beam.coders.DillCoder()
- input_transform_id = self._next_uid()
- output_stream = create_OutputStream()
- source_coder.get_impl().encode_to_stream(
- GlobalWindows.windowed_value(operation.source),
- output_stream,
- True)
- target_name = self._next_uid()
- input_data[(input_transform_id, target_name)] = output_stream.get()
- input_ptransform = beam_fn_api_pb2.PrimitiveTransform(
- id=input_transform_id,
- function_spec=beam_fn_api_pb2.FunctionSpec(
- urn=sdk_worker.DATA_INPUT_URN,
- id=self._next_uid()),
- # TODO(robertwb): Possible name collision.
- step_name=stage_name + '/inject_source',
- inputs={target_name: beam_fn_api_pb2.Target.List()},
- outputs={
- 'out':
- beam_fn_api_pb2.PCollection(
- coder_reference=coder_id(source_coder))
- })
- if data_operation_spec:
- input_ptransform.function_spec.data.Pack(data_operation_spec)
- transforms.append(input_ptransform)
-
- # Read the elements out of the source.
- fn = sdk_worker.pack_function_spec_data(
- OLDE_SOURCE_SPLITTABLE_DOFN_DATA,
- sdk_worker.PYTHON_DOFN_URN,
- id=self._next_uid())
- inputs = {
- 'ignored_input_tag':
- beam_fn_api_pb2.Target.List(target=[
- beam_fn_api_pb2.Target(
- primitive_transform_reference=input_transform_id,
- name='out')
- ])
- }
- side_inputs = {}
-
- elif isinstance(operation, operation_specs.WorkerDoFn):
- fn = sdk_worker.pack_function_spec_data(
- operation.serialized_fn,
- sdk_worker.PYTHON_DOFN_URN,
- id=self._next_uid())
- inputs = as_target(operation.input)
- # Store the contents of each side input for state access.
- for si in operation.side_inputs:
- assert isinstance(si.source, iobase.BoundedSource)
- element_coder = si.source.default_output_coder()
- view_id = self._next_uid()
- # TODO(robertwb): Actually flesh out the ViewFn API.
- side_inputs[si.tag] = beam_fn_api_pb2.SideInput(
- view_fn=sdk_worker.serialize_and_pack_py_fn(
- element_coder, urn=sdk_worker.PYTHON_ITERABLE_VIEWFN_URN,
- id=view_id))
- # Re-encode the elements in the nested context and
- # concatenate them together
- output_stream = create_OutputStream()
- for element in si.source.read(
- si.source.get_range_tracker(None, None)):
- element_coder.get_impl().encode_to_stream(
- element, output_stream, True)
- elements_data = output_stream.get()
- side_input_data[view_id] = elements_data
-
- elif isinstance(operation, operation_specs.WorkerFlatten):
- fn = sdk_worker.pack_function_spec_data(
- operation.serialized_fn,
- sdk_worker.IDENTITY_DOFN_URN,
- id=self._next_uid())
- inputs = {
- 'ignored_input_tag':
- beam_fn_api_pb2.Target.List(target=[
- beam_fn_api_pb2.Target(
- primitive_transform_reference=transform_index_to_id[
- input_op_index],
- name=output_tags(map_task[input_op_index][1])[
- input_output_index])
- for input_op_index, input_output_index in operation.inputs
- ])
- }
- side_inputs = {}
-
- else:
- raise TypeError(operation)
-
- ptransform = beam_fn_api_pb2.PrimitiveTransform(
- id=transform_id,
- function_spec=fn,
- step_name=stage_name,
- inputs=inputs,
- side_inputs=side_inputs,
- outputs=outputs(operation))
- transforms.append(ptransform)
-
- process_bundle_descriptor = beam_fn_api_pb2.ProcessBundleDescriptor(
- id=self._next_uid(),
- coders=coders.values(),
- primitive_transform=transforms)
-
- return input_data, side_input_data, runner_sinks, process_bundle_descriptor
-
def _run_map_task(
self, map_task, control_handler, state_handler, data_plane_handler,
data_operation_spec):
http://git-wip-us.apache.org/repos/asf/beam/blob/5d6ad199/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
index e2eae26..9159035 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
@@ -23,26 +23,12 @@ from apache_beam.runners.portability import fn_api_runner
from apache_beam.runners.portability import maptask_executor_runner_test
-class FnApiRunnerTestWithRunnerProtos(
+class FnApiRunnerTest(
maptask_executor_runner_test.MapTaskExecutorRunnerTest):
def create_pipeline(self):
return beam.Pipeline(
- runner=fn_api_runner.FnApiRunner(use_runner_protos=True))
-
- def test_combine_per_key(self):
- # TODO(robertwb): Implement PGBKCV operation.
- pass
-
- # Inherits all tests from maptask_executor_runner.MapTaskExecutorRunner
-
-
-class FnApiRunnerTestWithFnProtos(
- maptask_executor_runner_test.MapTaskExecutorRunnerTest):
-
- def create_pipeline(self):
- return beam.Pipeline(
- runner=fn_api_runner.FnApiRunner(use_runner_protos=False))
+ runner=fn_api_runner.FnApiRunner())
def test_combine_per_key(self):
# TODO(robertwb): Implement PGBKCV operation.
http://git-wip-us.apache.org/repos/asf/beam/blob/5d6ad199/sdks/python/apache_beam/runners/worker/sdk_worker.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py
index a2c9f42..d135984 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py
@@ -196,25 +196,6 @@ def pack_function_spec_data(value, urn, id=None):
# pylint: enable=redefined-builtin
-# TODO(vikasrk): Consistently use same format everywhere.
-def load_compressed(compressed_data):
- """Returns a decompressed and deserialized python object."""
- # Note: SDK uses ``pickler.dumps`` to serialize certain python objects
- # (like sources), which involves serialization, compression and base64
- # encoding. We cannot directly use ``pickler.loads`` for
- # deserialization, as the runner would have already base64 decoded the
- # data. So we only need to decompress and deserialize.
-
- data = zlib.decompress(compressed_data)
- try:
- return dill.loads(data)
- except Exception: # pylint: disable=broad-except
- dill.dill._trace(True) # pylint: disable=protected-access
- return dill.loads(data)
- finally:
- dill.dill._trace(False) # pylint: disable=protected-access
-
-
def memoize(func):
cache = {}
missing = object()
@@ -324,12 +305,6 @@ class SdkWorker(object):
return response
def create_execution_tree(self, descriptor):
- if descriptor.transforms:
- return self.create_execution_tree_from_runner_api(descriptor)
- else:
- return self.create_execution_tree_from_fn_api(descriptor)
-
- def create_execution_tree_from_runner_api(self, descriptor):
# TODO(robertwb): Figure out the correct prefix to use for output counters
# from StateSampler.
counter_factory = counters.CounterFactory()
@@ -368,131 +343,6 @@ class SdkWorker(object):
for transform_id in sorted(
descriptor.transforms, key=topological_height, reverse=True)]
- def create_execution_tree_from_fn_api(self, descriptor):
- # TODO(vikasrk): Add an id field to Coder proto and use that instead.
- coders = {coder.function_spec.id: operation_specs.get_coder_from_spec(
- json.loads(unpack_function_spec_data(coder.function_spec)))
- for coder in descriptor.coders}
-
- counter_factory = counters.CounterFactory()
- # TODO(robertwb): Figure out the correct prefix to use for output counters
- # from StateSampler.
- state_sampler = statesampler.StateSampler(
- 'fnapi-step%s-' % descriptor.id, counter_factory)
- consumers = collections.defaultdict(lambda: collections.defaultdict(list))
- ops_by_id = {}
- reversed_ops = []
-
- for transform in reversed(descriptor.primitive_transform):
- # TODO(robertwb): Figure out how to plumb through the operation name (e.g.
- # "s3") from the service through the FnAPI so that msec counters can be
- # reported and correctly plumbed through the service and the UI.
- operation_name = 'fnapis%s' % transform.id
-
- def only_element(iterable):
- element, = iterable
- return element
-
- if transform.function_spec.urn == DATA_OUTPUT_URN:
- target = beam_fn_api_pb2.Target(
- primitive_transform_reference=transform.id,
- name=only_element(transform.outputs.keys()))
-
- op = DataOutputOperation(
- operation_name,
- transform.step_name,
- consumers[transform.id],
- counter_factory,
- state_sampler,
- coders[only_element(transform.outputs.values()).coder_reference],
- target,
- self.data_channel_factory.create_data_channel(
- transform.function_spec))
-
- elif transform.function_spec.urn == DATA_INPUT_URN:
- target = beam_fn_api_pb2.Target(
- primitive_transform_reference=transform.id,
- name=only_element(transform.inputs.keys()))
- op = DataInputOperation(
- operation_name,
- transform.step_name,
- consumers[transform.id],
- counter_factory,
- state_sampler,
- coders[only_element(transform.outputs.values()).coder_reference],
- target,
- self.data_channel_factory.create_data_channel(
- transform.function_spec))
-
- elif transform.function_spec.urn == PYTHON_DOFN_URN:
- def create_side_input(tag, si):
- # TODO(robertwb): Extract windows (and keys) out of element data.
- return operation_specs.WorkerSideInputSource(
- tag=tag,
- source=SideInputSource(
- self.state_handler,
- beam_fn_api_pb2.StateKey.MultimapSideInput(
- key=si.view_fn.id.encode('utf-8')),
- coder=unpack_and_deserialize_py_fn(si.view_fn)))
- output_tags = list(transform.outputs.keys())
- spec = operation_specs.WorkerDoFn(
- serialized_fn=unpack_function_spec_data(transform.function_spec),
- output_tags=output_tags,
- input=None,
- side_inputs=[create_side_input(tag, si)
- for tag, si in transform.side_inputs.items()],
- output_coders=[coders[transform.outputs[out].coder_reference]
- for out in output_tags])
-
- op = operations.DoOperation(operation_name, spec, counter_factory,
- state_sampler)
- # TODO(robertwb): Move these to the constructor.
- op.step_name = transform.step_name
- for tag, op_consumers in consumers[transform.id].items():
- for consumer in op_consumers:
- op.add_receiver(
- consumer, output_tags.index(tag))
-
- elif transform.function_spec.urn == IDENTITY_DOFN_URN:
- op = operations.FlattenOperation(operation_name, None, counter_factory,
- state_sampler)
- # TODO(robertwb): Move these to the constructor.
- op.step_name = transform.step_name
- for tag, op_consumers in consumers[transform.id].items():
- for consumer in op_consumers:
- op.add_receiver(consumer, 0)
-
- elif transform.function_spec.urn == PYTHON_SOURCE_URN:
- source = load_compressed(unpack_function_spec_data(
- transform.function_spec))
- # TODO(vikasrk): Remove this once custom source is implemented with
- # splittable dofn via the data plane.
- spec = operation_specs.WorkerRead(
- iobase.SourceBundle(1.0, source, None, None),
- [WindowedValueCoder(source.default_output_coder())])
- op = operations.ReadOperation(operation_name, spec, counter_factory,
- state_sampler)
- op.step_name = transform.step_name
- output_tags = list(transform.outputs.keys())
- for tag, op_consumers in consumers[transform.id].items():
- for consumer in op_consumers:
- op.add_receiver(
- consumer, output_tags.index(tag))
-
- else:
- raise NotImplementedError
-
- # Record consumers.
- for _, inputs in transform.inputs.items():
- for target in inputs.target:
- consumers[target.primitive_transform_reference][target.name].append(
- op)
-
- reversed_ops.append(op)
- ops_by_id[transform.id] = op
-
- return list(reversed(reversed_ops))
-
def process_bundle(self, request, instruction_id):
ops = self.create_execution_tree(
self.fns[request.process_bundle_descriptor_reference])